跳至主要內容

dubbo进阶之服务通信和负载均衡

soulballad分布式DubboDubbo约 4958 字大约 17 分钟

客户端生成的proxy

上节课,我们分析到消费者初始化完成之后,会生成一个proxy,而这个proxy本质上是一个动态代理类。

JavassistProxyFactory.getProxy

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(
        new InvokerInvocationHandler(invoker));
}

首先我们来分解一下,这个invoker实际上是:MockClusterWrapper(FailoverCluster(directory)),
然后通过 InvokerInvocationHandler 做了一层包装变成了
InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

1566668274799

proxy.getProxy

这个方法里面,会生成一个动态代理的方法,我们通过 debug 可以看到动态字节码的拼接过程。它代理了当前这个接口的方法 sayHello , 并且方法里面是使用 handler.invoke 进行调用的。

而handler又是这样一个实现:

InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

public java.lang.String sayHello(java.lang.String arg0){
    Object[] args = new Object[1]; 
    args[0] = ($w)$1; 
    Object ret = handler.invoke(this, methods[0], args); 
    return (java.lang.String)ret;
}

消费端调用的过程

handler的调用链路为:

InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

图解调用链

1566612244047

InvokerInvocationHandler.invoke

这个方法主要判断当前调用的远程方法,如果是 tostring、hashcode、equals,就直接返回否则,调用invoker.invoke,进入到 MockClusterWrapper.invoke 方法

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    //createInvocation,参数为目标方法名称和目标方法的参数,看起来似乎是组装一个传输的对象
    return invoker.invoke(createInvocation(method, args)).recreate();
}

MockClusterInvoker.invoke

Mock,在这里面有两个逻辑

  1. 是否客户端强制配置了mock调用,那么在这种场景中主要可以用来解决服务端还没开发好的时候直接使用本地数据进行测试
  2. 是否出现了异常,如果出现异常则使用配置好的Mock类来实现服务的降级
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    //从url中获得MOCK_KEY对应的value
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(),
                    Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
    //如果没有配置mock,则直接传递给下个invoker调用
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {//如果强制为本地调用,则执行 mockInvoke
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + "force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            //如果远程调用出现异常,则使用Mock进行处理
            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + "fail-mock enabled , url : " + directory.getUrl(), e);
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

AbstractClusterInvoker.invoke

下一个invoke,应该进入FailoverClusterInvoker,但是在这里它又用到了模版方法,所以直接进入到父类的invoke方法中

  1. 绑定attachments,Dubbo中,可以通过 RpcContext 上的 setAttachment 和 getAttachment 在服务消费方和提供方之间进行参数的隐式传递,所以这段代码中会去绑定 attachments;

    RpcContext.getContext().setAttachment("index", "1")
    
  2. 通过 list 获得 invoker 列表,这个列表基本可以猜测到是从directory里面获得的、但是这里面还实现了服务路由的逻辑,简单来说就是先拿到 invoker 列表,然后通过 router 进行服务路由,筛选出符合路由规则的服务提供者(暂时不细讲,属于另外一个逻辑)

  3. initLoadBalance 初始化负载均衡机制

  4. 执行 doInvoke

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // attachments,这个是什么意
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

initLoadBalance

不用看这个代码,基本也能猜测到,会从url中获得当前的负载均衡算法,然后使用spi机制来获得负载均衡的扩展点。然后返回一个具体的实现

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, 
                                      Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class)
            .getExtension(invokers.get(0).getUrl()
             .getMethodParameter(RpcUtils.getMethodName(invocation), 
                Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class)
            .getExtension(Constants.DEFAULT_LOADBALANCE);
    }
}

FailoverClusterInvoker.doInvoke

这段代码逻辑也很好理解,因为我们之前在讲 Dubbo 的时候说过容错机制,而 failover 是失败重试,所以这里面应该会实现容错的逻辑

  • 获得重试的次数,并且进行循环

  • 获得目标服务,并且记录当前已经调用过的目标服务防止下次继续将请求发送过去

  • 如果执行成功,则返回结果

  • 如果出现异常,判断是否为业务异常,如果是则抛出,否则,进行下一次重试

    1566612830971

  • 这里的 Invoker 是 Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息

  • Directory 代表多个 Invoker ,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更

  • Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker ,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个

  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等

  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, 
                       LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, 
                                          Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        //通过负载均衡获得目标invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);//记录已经调用过的服务,下次调用会进行过滤
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //服务调用成功,直接返回结果
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // 如果是业务异常,直接抛出不进行重试
                throw e;
            }
            le = e;//记录异常信息,进行下一次循环
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "+ methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

负载均衡

select

在调用invoker.invoke之前,会需要通过select选择一个合适的服务进行调用,而这个选择的过程其实就是负载均衡的实现

所有负载均衡实现类均继承自 AbstractLoadBalance,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。所以在分析负载均衡实现之前,先来看一下 AbstractLoadBalance 的逻辑。首先来看一下负载均衡的入口方法 select,如下:

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, 
                             Invocation invocation) {
    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    // 如果 invokers 列表中仅有一个 Invoker,直接返回即可,无需进行负载均衡
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    //调用 doSelect 方法进行负载均衡,该方法为抽象方法,由子类实现
    return doSelect(invokers, url, invocation);
}

负载均衡的子类实现有四个,默认情况下是RandomLoadBalance

randomLoadBalance

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, 
                                  Invocation invocation) {
    int length = invokers.size();
    int totalWeight = 0;
    boolean sameWeight = true;
    // 下面这个循环有两个作用,第一是计算总权重 totalWeight,
    // 第二是检测每个服务提供者的权重是否相同
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        // 累加权重
        totalWeight += weight;
        // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同,
        // 不相同的话,则将 sameWeight 置为 false。
        if (sameWeight && i > 0
            && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false;
        }
    }

    // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
    if (totalWeight > 0 && !sameWeight) {
        // 随机获取一个 [0, totalWeight) 区间内的数字
        int offset = random.nextInt(totalWeight);
        // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。
        // 举例说明一下,servers = [A, B, C],weights = [5, 3, 2],offset = 7。
        // 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
        // 表明其不会落在服务器 A 对应的区间上。
        // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
        // 表明其会落在服务器 B 对应的区间上
        for (int i = 0; i < length; i++) {
            // 让随机值 offset 减去权重值
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                // 返回相应的 Invoker
                return invokers.get(i);
            }
        }
    }
    // 如果所有服务提供者权重值相同,此时直接随机返回一个即可
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

通过从RegistryDirectory中获得的invoker是什么呢?这个很重要,因为它决定了接下来的调用过程。

这个时候我们需要去了解这个invoker是在哪里初始化的?

可调用的Invoker初始化过程

RegistryDirectory

在RegistryDirectory中有一个成员属性,保存了服务地方地址对应的invoke信息

private volatile Map<String, Invoker<T>> urlInvokerMap;

toInvokers

这个invoker是动态的,基于注册中心的变化而变化的。它的初始化过程的链路是
RegistryDirectory.notify->refreshInvoker->toInvokers 下面的这段代码中

if (invoker == null) { // Not in the cache, refer again                
    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, 
                                    providerUrl);
}

是基于protocol.refer来构建的invoker,并且使用InvokerDelegate进行了委托,在dubboprotocol中,是这样构建invoker的。返回的是一个DubboInvoker对象

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, 
                                                  getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

所以这个invoker应该是:

InvokerDelegate(ProtocolFilterWrapper(ListenerInvokerWrapper(DubboInvoker())
ProtocolFilterWrapper->这个是一个invoker的过滤链路
ListenerInvokerWrapper-> 这里面暂时没做任何的实现

所以我们可以直接看到DubboInvoker这个类里面来

DubboInvoker

AbstractInvoker.invoke

这里面也是对 Invocation 的 attachments 进行处理,把 attachment 加入到 Invocation 中

这里的 attachment,实际上是目标服务的接口信息以及版本信息

DubboInvoker.doInvoker

这里面看到一个很熟悉的东西,就是ExchangeClient,这个是客户端和服务端之间的连接

然后如果当前方法有返回值,也就是isOneway=false,则执行else逻辑,然后通过异步的形式进行通信

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    //将目标方法以及版本好作为参数放入到Invocation中
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);
    //获得客户端连接
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        //判断方法是否有返回值
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        //获得超时时间, 默认是1s
        int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, 
                                                  DEFAULT_TIMEOUT);
        if (isOneway) {//如果没有返回值
            boolean isSent = getUrl().getMethodParameter(methodName, 
                                                         Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = 
                currentClient.request(inv, timeout);
            responseFuture.whenComplete((obj, t) -> {
                if (t != null) {
                    asyncRpcResult.completeExceptionally(t);
                } else {
                    asyncRpcResult.complete((AppResponse) obj);
                }
            });
            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

currentClient.request

currentClient 还记得是一个什么对象吗?
它实际是一个 ReferenceCountExchangeClient(HeaderExchangeClient())

所以它的调用链路是
ReferenceCountExchangeClient->HeaderExchangeClient->HeaderExchangeChannel->(request方法)

最终,把构建好的RpcInvocation,组装到一个Request对象中进行传递

public CompletableFuture<Object> request(Object request, int timeout) 
    throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // 创建请求对象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        //NettyClient
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

channel.send 的调用链路
AbstractPeer.send ->AbstractClient.send->NettyChannel.send

通过 NioSocketChannel 把消息发送出去

ChannelFuture future = channel.writeAndFlush(message);

服务端接收消息的处理流程

客户端把消息发送出去之后,服务端会收到消息,然后把执行的结果返回到客户端

服务端接收到消息

服务端这边接收消息的处理链路,也比较复杂,我们回到 NettServer 中创建io的过程

bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            // FIXME: should we use getTimeout()?
            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
            NettyCodecAdapter adapter = 
                new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ch.pipeline()
                //.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                .addLast("decoder", adapter.getDecoder())
                .addLast("encoder", adapter.getEncoder())
                .addLast("server-idle-handler", 
                         new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                .addLast("handler", nettyServerHandler);
        }
    });

handler配置的是nettyServerHandler

server-idle-handler 表示心跳处理的机制

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否

handler.channelRead()

服务端收到读的请求是,会进入这个方法。

接着通过handler.received来处理msg,这个handle的链路很长,比较复杂,我们需要逐步剖析

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = 
        NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

handler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->

HeaderExchangeHandler-> 最后进入这个方法->DubboProtocol$requestHandler(receive)

  • MultiMessageHandler:复合消息处理
  • HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应
  • AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任务给线程池处理
  • DecodeHandler:业务解码处理器

服务端处理链路

1566614093448

HeaderExchangeHandler.received

交互层请求响应处理,有三种处理方式

  1. handlerRequest,双向请求

  2. handler.received 单向请求

  3. handleResponse 响应消息

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = 
        HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

ExchangeHandler.reply

接着进入到ExchangeHandler.reply这个方法中

把message转化为Invocation
调用getInvoker获得一个Invoker对象
然后通过Result result = invoker.invoke(inv); 进行调用

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) 
    throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    // 需要考虑到回调的问题
    if  (Boolean.TRUE.toString().equals(
        				inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    RpcContext rpcContext = RpcContext.getContext();
    rpcContext.setRemoteAddress(channel.getRemoteAddress());
    Result result = invoker.invoke(inv);

    if (result instanceof AsyncRpcResult) {
        return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);

    } else {
        return CompletableFuture.completedFuture(result);
    }
}                    

getInvoker

这里面是获得一个invoker的实现

DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

这段代码非常熟悉,exporterMap不就是我们之前在分析服务发布的过程中,保存的Invoker吗?

而key,就是对应的 interface:port 。

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(Constants.PATH_KEY);

    // if it's callback service on client side isStubServiceInvoke = 
    Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
    if (isStubServiceInvoke) {
        port = channel.getRemoteAddress().getPort();
    }

    //callback
    isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
    if (isCallBackServiceInvoke) {
        path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
        inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
    }

    String serviceKey = serviceKey(port, path, 
                                   inv.getAttachments().get(Constants.VERSION_KEY), 
                                   inv.getAttachments().get(Constants.GROUP_KEY));
    DubboExporter<?> exporter = (DubboExporter<?>)exporterMap.get(serviceKey);

    if (exporter == null) {
        throw new RemotingException(channel, "Not found exported service: "+ serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); }

    return exporter.getInvoker();
}

exporterMap

Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

在服务发布时,实际上是把invoker包装成了DubboExpoter。然后放入到exporterMap中。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

invoker.invoke(inv)

接着调用invoker.invoke

那么再回忆一下,此时的invoker是一个什么呢?

invoker=ProtocolFilterWrapper(InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxy
Invoker)))

最后一定会进入到这个代码里面

AbstractProxyInvoker

在AbstractProxyInvoker里面,doInvoker本质上调用的是wrapper.invokeMethod()

return new AbstractProxyInvoker<T>(proxy, type, url) {
    @Override
    protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes,
                              Object[] arguments) throws Throwable {
        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    }
};

而Wrapper是一个动态代理类,它的定义是这样的, 最终调用w.sayHello()方法进行处理

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) 
    throws java.lang.reflect.InvocationTargetException {
    com.gupaoedu.dubbo.practice.ISayHelloService w;
    try {
        w = ((com.gupaoedu.dubbo.practice.ISayHelloService) $1);
    } catch (Throwable e) {
        throw new IllegalArgumentException(e);
    }
    try {
        if ("sayHello".equals($2) && $3.length == 1) {
            return ($w) w.sayHello((java.lang.String) $4[0]);
        }
    } catch (Throwable e) {
        throw new java.lang.reflect.InvocationTargetException(e);
    }
    throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.gupaoedu.dubbo.practice.ISayHelloService.");
}

到此为止,服务端的处理过程就分析完了。

性能调优相关参数

常用的性能调优参数

1566614795054

各个参数的作用

1566614924994

1、当consumer发起一个请求时,首先经过active limit(参数actives)进行方法级别的限制,其实现方式为CHM中存放计数器(AtomicInteger),请求时加1,请求完成(包括异常)减1,如果超过actives则等待有其他请求完成后重试或者超时后失败;

2、从多个连接(connections)中选择一个连接发送数据,对于默认的netty实现来说,由于可以复用连接,默认一个连接就可以。不过如果你在压测,且只有一个consumer,一个provider,此时适当的加大connections确实能够增强网络传输能力。但线上业务由于有多个consumer多个provider,因此不建议增加connections参数;
3、连接到达provider时(如dubbo的初次连接),首先会判断总连接数是否超限(acceps),超过限制连接将被拒绝;
4、连接成功后,具体的请求交给io thread处理。io threads虽然是处理数据的读写,但io部分为异步,更多的消耗的是cpu,因此iothreads默认cpu个数+1是比较合理的设置,不建议调整此参数;
5、数据读取并反序列化以后,交给业务线程池处理,默认情况下线程池为fixed,且排队队列为 0(queues),这种情况下,最大并发等于业务线程池大小(threads),如果希望有请求的堆积能力,可以调整queues参数。如果希望快速失败由其他节点处理(官方推荐方式),则不修改queues,只调整 threads;
6、execute limit(参数executes)是方法级别的并发限制,原理与actives类似,只是少了等待的过程,即受限后立即失败

Dubbo的课程总结

Dubbo一共讲了6次课程,对于源码这块,我花了比较多的时间来分析,对于应用,由于比较简单,所以没做过多的讲解

第一节课

  • 关于为什么要用Dubbo
  • Dubbo的基本应用演示
  • Dubbo如何集成zookeeper实现服务注册
  • Dubbo的多协议支持以及监控平台的使用

第二节课

  • dubbo对于服务治理功能的支持
  • springboot整合dubbo
  • 负载均衡的配置
  • 集群容错的配置
  • 服务降级功能的使用
  • 主机绑定
  • 2.7新功能之配置中心和元数据中心

第三节课

  • dubbo的SPI机制
  • 自适应扩展点
  • 激活扩展点

第四节课

  • 服务的发布过程分析

第五节课

  • Invoker
  • 服务的注册源码分析
  • 消费端的proxy初始化源码分析

第六节课

  • 客户端实现请求传输
  • 服务端处理请求的逻辑

很多同学希望课程多讲讲实际应用,我实在不想把课程的 level 再往下拉了,正常来说,架构课程3个月可以讲完,主要以思想和方法论为主。为了照顾很多同学,才不断的去讲一些比较基础的应用。

实际上大家要明白的是,在你熟悉的领域里面学习,你虽然听起来很爽,但是收获太少。

上次编辑于:
贡献者: soulballad