irpas技术客

dubbo 分组与多版本_普通网友_dubbo多版本

大大的周 6995


dubbo 分组与多版本

服务分组:https://dubbo.apache.org/zh/docs/advanced/service-group/

多版本:https://dubbo.apache.org/zh/docs/advanced/multi-versions/

分组与多版本

分组:一个接口有多个实现,可以用group来区分

# 服务提供端 <dubbo:service group="feedback" interface="com.xxx.IndexService" /> <dubbo:service group="member" interface="com.xxx.IndexService" /> # 服务消费端:消费指定分组 <dubbo:reference id="feedbackIndexService" group="feedback" interface="com.xxx.IndexService" /> <dubbo:reference id="memberIndexService" group="member" interface="com.xxx.IndexService" /> # 服务消费端:消费任意分组 <dubbo:reference id="barService" interface="com.foo.BarService" group="*" />

多版本:接口需要升级时可用版本来区分,升级到新版本后,旧版本下线

# 服务提供端旧版本(1.0.0) <dubbo:service interface="com.foo.BarService" version="1.0.0" /> # 服务提供端新版本(2.0.0) <dubbo:service interface="com.foo.BarService" version="2.0.0" /> # 消费端消费旧版本(1.0.0) <dubbo:reference id="barService" interface="com.foo.BarService" version="1.0.0" /> # 消费端消费新版本(2.0.0) <dubbo:reference id="barService" interface="com.foo.BarService" version="2.0.0" /> # 消费端消费任意版本(*):不会调用不携带version的服务端服务 <dubbo:reference id="barService" interface="com.foo.BarService" version="*" />

版本升级步骤

在低压力时间,将一半服务提供端升级为新版本 将消费逐渐切换到新版本服务进行消费 全部消费端切换到新版本服务消费后,旧版本服务下线 实现原理

AbstractProtocol:服务暴露过程中存储exporter,供后续消费时使用

public abstract class AbstractProtocol implements Protocol, ScopeModelAware { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<>(); //key:group/serviceName:version:port //value:服务对应的exporter,dubbo协议为dubboExporter、triple协议为new AbstractExporter /** * <host:port, ProtocolServer> */ protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>(); //dubbo协议中使用,存储服务端创建的server // TODO SoftReference protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<>(); protected FrameworkModel frameworkModel; @Override public void setFrameworkModel(FrameworkModel frameworkModel) { this.frameworkModel = frameworkModel; } protected static String serviceKey(URL url) { int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); return serviceKey(port, url.getPath(), url.getVersion(), url.getGroup()); } protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); } //构建exporterMap的key,格式为:group/serviceName:version:port

**************

dubbo协议

服务端服务暴露调用栈

# DubboProtocol.export操作 at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.export(DubboProtocol.java:296) # export操作 at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:76) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:66) at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:61) at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.export(ProtocolSerializationWrapper.java:47) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) at org.apache.dubbo.registry.integration.RegistryProtocol.lambda$doLocalExport$2(RegistryProtocol.java:292) at org.apache.dubbo.registry.integration.RegistryProtocol$$Lambda$904/0x000000080104d0e0.apply(Unknown Source:-1) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) - locked <0x1c0b> (a java.util.concurrent.ConcurrentHashMap$ReservationNode) at org.apache.dubbo.registry.integration.RegistryProtocol.doLocalExport(RegistryProtocol.java:290) at org.apache.dubbo.registry.integration.RegistryProtocol.export(RegistryProtocol.java:237) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:74) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:64) at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:58) at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.export(ProtocolSerializationWrapper.java:47) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) # ServiceConfig类 at org.apache.dubbo.config.ServiceConfig.doExportUrl(ServiceConfig.java:638) at org.apache.dubbo.config.ServiceConfig.exportRemote(ServiceConfig.java:612) at org.apache.dubbo.config.ServiceConfig.exportUrl(ServiceConfig.java:572) at org.apache.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ServiceConfig.java:402) at org.apache.dubbo.config.ServiceConfig.doExportUrls(ServiceConfig.java:388) at org.apache.dubbo.config.ServiceConfig.doExport(ServiceConfig.java:363) - locked <0x1c0c> (a org.apache.dubbo.config.spring.ServiceBean) at org.apache.dubbo.config.ServiceConfig.export(ServiceConfig.java:237) # DefaultModuleDeployer类 at org.apache.dubbo.config.deploy.DefaultModuleDeployer.exportServiceInternal(DefaultModuleDeployer.java:338) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.exportServices(DefaultModuleDeployer.java:310) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:142) - locked <0x1c0d> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer) # DubboDeployApplicationListener类 at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44) # SimpleApplicationEventMulticaster类 at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143) # AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) - locked <0x1c0e> (a java.lang.Object) # ServletWebServerApplicationContext类 at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) # SpringApplication类 at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) at org.springframework.boot.SpringApplication.run(SpringApplication.java:302) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290) # 项目启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:12)

服务端服务暴露

public class DubboProtocol extends AbstractProtocol { @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { checkDestroyed(); URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //创建dubboExporter,将exporter放入exporterMap中 //export a stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } } openServer(url); //创建服务端服务器,打开服务端口 optimizeSerialization(url); return exporter; }

DubboExporter

public class DubboExporter<T> extends AbstractExporter<T> { private final String key; private final Map<String, Exporter<?>> exporterMap; public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; exporterMap.put(key, this); } // 创建dubboExporter,将exporter对象放入exporterMap中 @Override public void afterUnExport() { exporterMap.remove(key); } }

消费端向服务端发送请求调用栈

# FailoverClusterInvoker.doInvoke触发服务调用(消费端普通filter在这之后调用) at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) # clusterFilter:ConsumerContextFilter、FutureFilter、MonitorFilter过滤 at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:51) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter.invoke(ConsumerContextFilter.java:108) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) at org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster$ClusterFilterInvoker.invoke(AbstractCluster.java:92) at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:95) at org.apache.dubbo.registry.client.migration.MigrationInvoker.invoke(MigrationInvoker.java:276) at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:92) at org.apache.dubbo.common.bytecode.proxy1.hello(proxy1.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:568) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) at jdk.proxy2.$Proxy64.hello(Unknown Source:-1) # 项目调用入口方法 at com.example.demo.controller.HelloController.hello(HelloController.java:28) # 反射操作 at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:568) # web请求处理 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) - locked <0x263e> (a org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper) # 线程池处理 at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:833)

FailoverClusterInvoker

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = calculateInvokeTimes(methodName); // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getServiceContext().setInvokers((List) invoked); boolean success = false; try { Result result = invokeWithContext(invoker, invocation); //触发服务调用,invoker、invocation均携带服务调用信息 if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } success = true; return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { if (!success) { providers.add(invoker.getUrl().getAddress()); } } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); } private int calculateInvokeTimes(String methodName) { int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; RpcContext rpcContext = RpcContext.getClientAttachment(); Object retry = rpcContext.getObjectAttachment(RETRIES_KEY); if (retry instanceof Number) { len = ((Number) retry).intValue() + 1; rpcContext.removeAttachment(RETRIES_KEY); } if (len <= 0) { len = 1; } return len; } }

服务端接收到消费请求调用栈

# 最终调用的服务方法:HelloServiceImpl.hello at com.example.demo.service.impl.HelloServiceImpl.hello(HelloServiceImpl.java:16) # invoke方法 at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java:-1) at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:47) at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:86) at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:55) at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56) # 过滤器过滤 at org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter.invoke(ClassLoaderCallbackFilter.java:38) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter.invoke(TraceFilter.java:78) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.TimeoutFilter.invoke(TimeoutFilter.java:46) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.ExceptionFilter.invoke(ExceptionFilter.java:52) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.auth.filter.ProviderAuthFilter.invoke(ProviderAuthFilter.java:53) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.ContextFilter.invoke(ContextFilter.java:131) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.GenericFilter.invoke(GenericFilter.java:197) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.ClassLoaderFilter.invoke(ClassLoaderFilter.java:46) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.EchoFilter.invoke(EchoFilter.java:41) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) # 处理请求 at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol$1.reply(DubboProtocol.java:152) //调用DubboProtocol中的匿名抽象实现方法requestHandler.reply at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:100) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:175) at org.apache.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:51) # 线程池处理 at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:57) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at org.apache.dubbo.common.threadlocal.InternalRunnable.run(InternalRunnable.java:41) at java.lang.Thread.run(Thread.java:833)

服务端接收到消费请求处理

public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; //转换为invocation对象,内含各种信息(group、调用的方法、version等) Invoker<?> invoker = getInvoker(channel, inv); inv.setServiceModel(invoker.getUrl().getServiceModel()); // switch TCCL if (invoker.getUrl().getServiceModel() != null) { Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader()); } // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); //调用方法,获取方法结果 return result.thenApply(Function.identity()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isDebugEnabled()) { logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } /** * FIXME channel.getUrl() always binds to a fixed service, and this service is random. * we can choose to use a common service to carry onConnect event if there's no easy way to get the specific * service this connection is binding to. * @param channel * @param url * @param methodKey * @return */ private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]); invocation.setAttachment(PATH_KEY, url.getPath()); invocation.setAttachment(GROUP_KEY, url.getGroup()); invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY)); invocation.setAttachment(VERSION_KEY, url.getVersion()); if (url.getParameter(STUB_EVENT_KEY, false)) { invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } }; Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke; boolean isStubServiceInvoke; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); // if it's callback service on client side isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY); inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); //根据inv构建serviceKey DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); //从内存exporterMap中获取exporter if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); //从exporter中获取invoker }

**************

triple协议

服务暴露调用栈

# TripleProtocol.export操作 at org.apache.dubbo.rpc.protocol.tri.TripleProtocol.export(TripleProtocol.java:58) # export操作 at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:66) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:76) at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:61) at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.export(ProtocolSerializationWrapper.java:47) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) at org.apache.dubbo.registry.integration.RegistryProtocol.lambda$doLocalExport$2(RegistryProtocol.java:292) at org.apache.dubbo.registry.integration.RegistryProtocol$$Lambda$903/0x000000080105d0c0.apply(Unknown Source:-1) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) - locked <0x1c4c> (a java.util.concurrent.ConcurrentHashMap$ReservationNode) at org.apache.dubbo.registry.integration.RegistryProtocol.doLocalExport(RegistryProtocol.java:290) at org.apache.dubbo.registry.integration.RegistryProtocol.export(RegistryProtocol.java:237) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export(ProtocolListenerWrapper.java:64) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.export(QosProtocolWrapper.java:74) at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.export(ProtocolFilterWrapper.java:58) at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.export(ProtocolSerializationWrapper.java:47) at org.apache.dubbo.rpc.Protocol$Adaptive.export(Protocol$Adaptive.java:-1) # ServiceConfig类 at org.apache.dubbo.config.ServiceConfig.doExportUrl(ServiceConfig.java:638) at org.apache.dubbo.config.ServiceConfig.exportRemote(ServiceConfig.java:612) at org.apache.dubbo.config.ServiceConfig.exportUrl(ServiceConfig.java:572) at org.apache.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol(ServiceConfig.java:402) at org.apache.dubbo.config.ServiceConfig.doExportUrls(ServiceConfig.java:388) at org.apache.dubbo.config.ServiceConfig.doExport(ServiceConfig.java:363) - locked <0x1c4d> (a org.apache.dubbo.config.spring.ServiceBean) at org.apache.dubbo.config.ServiceConfig.export(ServiceConfig.java:237) # DefaultModuleDeployer类 at org.apache.dubbo.config.deploy.DefaultModuleDeployer.exportServiceInternal(DefaultModuleDeployer.java:338) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.exportServices(DefaultModuleDeployer.java:310) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:142) - locked <0x1c4e> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer) # DubboDeployApplicationListener类 at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44) # SimpleApplicationEventMulticaster类 at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143) # AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) - locked <0x1c4f> (a java.lang.Object) # ServletWebServerApplicationContex类 at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) # SpringApplication类 at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) at org.springframework.boot.SpringApplication.run(SpringApplication.java:302) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290) # 项目启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:12)

TripleExport

public class TripleProtocol extends AbstractProtocol implements Protocol { private static final Logger logger = LoggerFactory.getLogger(TripleProtocol.class); private final PathResolver pathResolver; private final TriBuiltinService triBuiltinService; public TripleProtocol(FrameworkModel frameworkModel) { this.triBuiltinService = new TriBuiltinService(frameworkModel); this.pathResolver = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension(); } @Override public int getDefaultPort() { return 50051; } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); String key = serviceKey(url); //构建key,格式:group/serviceName:version:port final AbstractExporter<T> exporter = new AbstractExporter<T>(invoker) { @Override public void afterUnExport() { pathResolver.remove(url.getServiceKey()); pathResolver.remove(url.getServiceInterface()); exporterMap.remove(key); } }; exporterMap.put(key, exporter); //exporterMap中存放exporter invokers.add(invoker); pathResolver.add(url.getServiceKey(), invoker); pathResolver.add(url.getServiceInterface(), invoker); // set service status triBuiltinService.getHealthStatusManager().setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING); triBuiltinService.getHealthStatusManager().setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING); PortUnificationExchanger.bind(invoker.getUrl()); return exporter; } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { TripleInvoker<T> invoker; try { url = ExecutorUtil.setThreadName(url, "DubboClientHandler"); url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); ExecutorRepository executorRepository = url.getOrDefaultApplicationModel() .getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); executorRepository.createExecutorIfAbsent(url); invoker = new TripleInvoker<>(type, url, invokers); } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } invokers.add(invoker); return invoker; } @Override protected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException { return null; } @Override public void destroy() { if (logger.isInfoEnabled()) { logger.info("Destroying protocol [" + this.getClass().getSimpleName() + "] ..."); } PortUnificationExchanger.close(); pathResolver.destroy(); super.destroy(); } }

消费端消费请求发送:调用路径同dubbo协议,参数不同

# FailoverClusterInvoker.doInvoke触发服务调用(消费端普通filter在这之后调用) at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) # clusterFilter:ConsumerContextFilter、FutureFilter、MonitorFilter过滤 at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:51) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter.invoke(ConsumerContextFilter.java:108) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) at org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster$ClusterFilterInvoker.invoke(AbstractCluster.java:92) at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:95) at org.apache.dubbo.registry.client.migration.MigrationInvoker.invoke(MigrationInvoker.java:276) at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:92) at org.apache.dubbo.common.bytecode.proxy1.hello(proxy1.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:568) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) at jdk.proxy2.$Proxy64.hello(Unknown Source:-1) # 项目调用入口方法 at com.example.demo.controller.HelloController.hello(HelloController.java:28) # 反射操作 at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:568) # web请求处理 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) - locked <0x263e> (a org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper) # 线程池处理 at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:833)

FailoverClusterInvoker

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = calculateInvokeTimes(methodName); // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getServiceContext().setInvokers((List) invoked); boolean success = false; try { Result result = invokeWithContext(invoker, invocation); //服务调用 if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } success = true; return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { if (!success) { providers.add(invoker.getUrl().getAddress()); } } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); } private int calculateInvokeTimes(String methodName) { int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; RpcContext rpcContext = RpcContext.getClientAttachment(); Object retry = rpcContext.getObjectAttachment(RETRIES_KEY); if (retry instanceof Number) { len = ((Number) retry).intValue() + 1; rpcContext.removeAttachment(RETRIES_KEY); } if (len <= 0) { len = 1; } return len; } }

服务端收到消费请求处理调用栈

# 最终调用服务端方法 at com.example.demo.service.impl.HelloServiceImpl.hello(HelloServiceImpl.java:16) # invoke操作 at org.apache.dubbo.common.bytecode.Wrapper3.invokeMethod(Wrapper3.java:-1) at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:47) //使用javassist创建代理对象 at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:86) at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:55) at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56) # 过滤器过滤 at org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter.invoke(ClassLoaderCallbackFilter.java:38) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter.invoke(TraceFilter.java:78) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.TimeoutFilter.invoke(TimeoutFilter.java:46) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.ExceptionFilter.invoke(ExceptionFilter.java:52) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.auth.filter.ProviderAuthFilter.invoke(ProviderAuthFilter.java:53) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.ContextFilter.invoke(ContextFilter.java:131) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.GenericFilter.invoke(GenericFilter.java:197) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.ClassLoaderFilter.invoke(ClassLoaderFilter.java:46) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.filter.EchoFilter.invoke(EchoFilter.java:41) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) # 服务端请求处理 at org.apache.dubbo.rpc.protocol.tri.UnaryServerStream$UnaryServerTransportObserver.invoke(UnaryServerStream.java:71) at org.apache.dubbo.rpc.protocol.tri.UnaryServerStream$UnaryServerTransportObserver.lambda$onComplete$0(UnaryServerStream.java:58) at org.apache.dubbo.rpc.protocol.tri.UnaryServerStream$UnaryServerTransportObserver$$Lambda$974/0x0000000801185858.run(Unknown Source:-1) at org.apache.dubbo.rpc.protocol.tri.AbstractServerStream.lambda$execute$0(AbstractServerStream.java:341) at org.apache.dubbo.rpc.protocol.tri.AbstractServerStream$$Lambda$975/0x0000000801185a80.run(Unknown Source:-1) # 服务端线程池处理 at org.apache.dubbo.common.threadpool.serial.SerializingExecutor.run(SerializingExecutor.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.lang.Thread.run(Thread.java:833)

UnaryServerStream

public class UnaryServerStream extends AbstractServerStream implements Stream { protected UnaryServerStream(URL url) { super(url); } @Override protected StreamObserver<Object> createStreamObserver() { return null; } @Override protected InboundTransportObserver createInboundTransportObserver() { return new UnaryServerTransportObserver(); } private class UnaryServerTransportObserver extends ServerUnaryInboundTransportObserver { @Override public void onError(GrpcStatus status) { transportError(status); } @Override public void onComplete() { execute(() -> { if (getData() != null) { invoke(); } else { onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL) .withDescription("Missing request data")); } }); } public void invoke() { RpcInvocation invocation = buildUnaryInvocation(getHeaders(), getData()); if (invocation == null) { return; } final Result result = getInvoker().invoke(invocation); //服务端调用返回结果 CompletionStage<Object> future = result.thenApply(Function.identity()); future.whenComplete((o, throwable) -> { if (throwable != null) { LOGGER.error("Invoke error", throwable); transportError(getStatus(throwable)); return; } AppResponse response = (AppResponse) o; if (response.hasException()) { transportError(getStatus(response.getException())); return; } Metadata metadata = createResponseMeta(); outboundTransportObserver().onMetadata(metadata, false); final byte[] data = encodeResponse(response.getValue()); if (data == null) { // already handled in encodeResponse() return; } outboundTransportObserver().onData(data, false); Metadata trailers = TripleConstant.getSuccessResponseMeta(); convertAttachment(trailers, response.getObjectAttachments()); outboundTransportObserver().onMetadata(trailers, true); }); RpcContext.removeContext(); } } } 注解使用示例

服务端

@DubboService(group = "aaa",version = "1.0.0") public class HelloServiceImpl implements HelloService { @Override public List<String> hello() { System.out.println("hello provider"); return Collections.singletonList("hello provider"); } }

消费端

@RestController public class HelloController { @DubboReference(group = "aaa", version = "*") private HelloService helloService; @RequestMapping("/hello") public String hello(){ helloService.hello().forEach(System.out::print); return "hello consumer 2"; } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #dubbo多版本 #dubbo