Python 官方文档:入门教程 => 点击学习
目录Provider端线程模型AllDispatcherDirectDispatcherExecutionDispatcherMessageOnlyDispatcherConnect
在了解服务端线程模型之前,先了解一下dubbo对Channel上的操作抽象,Dubbo将Channel上的操作成了5中行为,分别是:建立连接、断开连接、发送消息、接收消息、异常捕获,Channel上的操作的接口为org.apache.dubbo.remoting.ChannelHandler
,该接口是SPI
的,用户可以自己扩展,接口代码如下:
该接口抽象的五种Channel上的行为解释如下:
Dubbo框架的线程模型与以上这五种行为息息相关,Dubbo协议Provider端线程模型提供了五种实现,虽说都是五种但是别把二者混淆,线程模型的顶级接口是org.apache.dubbo.remoting.Dispatcher
,该接口也是SPI的,提供的五种实现分别是AllDispatcher
、DirectDispatcher
、MessageOnlyDispatcher
、ExecutionDispatcher
、ConnectionOrderedDispatcher
,默认的使用的是AllDispatcher
。
org.apache.dubbo.remoting.ChannelHandler
作为Channel上的行为的顶级接口对应Dubbo协议Provider端的5种线程模型同样也提供了对应的5种实现,分别是AllChannelHandler
、DirectChannelHandler
、MessageOnlyChannelHandler
、ExecutionChannelHandler
、ConnectionOrderedChannelHandler
,这里Channel上行为的具体实现不展开讨论。
Channel上行为和线程模型之间使用策略可以参考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers
的源代码,这里不做详细的介绍,下面的各个章节只针对5种线程模型做简单的介绍。
IO线程上的操作:
Dubbo线程池上的操作:
AllDispatcher
代码如下,AllDispatcher
的dispatch
方法实例化了AllChannelHandler
,AllChannelHandler
实现了received、connected、disconnected、caught操作在dubbo线程池中,代码如下:
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
该线程模型Channel上的所有行为均在IO线程中执行,并没有在Dubbo线程池中执行
DirectDispatcher
与AllDispatcher
相似,实例化了DirectChannelHandler
,DirectChannelHandler
只实现了received行为,但是received中获取的线程池如果是ThreadlessExecutor
才会提交task,否则也是在ChannelHandler中执行received行为,ThreadlessExecutor
和普通线程池最大的区别是不会管理任何线程,这里不展开讨论。
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new DirectChannelHandler(handler, url);
}
}
public class DirectChannelHandler extends WrappedChannelHandler {
public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
if (executor instanceof ThreadlessExecutor) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
handler.received(channel, message);
}
}
}
在IO线程中执行的操作有:
在Dubbo线程中执行的操作有:
同样的,我们可以直接看ExecutionChannelHandler
源码,逻辑是当message的类型是Request
时received行为在Dubbo线程池执行。感兴趣的可以自己看源码,这里不做介绍。
Message Only Dispatcher所有的received行为和反序列化都是在dubbo线程池中执行的
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
该线程模型与AllDispatcher
类似,sent操作和相应的序列化是在IO线程上执行;connected、disconnected、received、caught操作在dubbo线程池上执行,他们的区别是在connected、disconnected行为上ConnectionOrderedDispatcher
做了线程池隔离,并且在Dubbo connected thread pool中提供了链接限制、告警灯能力,我们直接看ConnectionOrderedChannelHandler
源码,代码如下:
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queueWarningLimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
@Override
public void connected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (message instanceof Request && t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queueWarningLimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit));
}
}
}
到此这篇关于Java Dubbo协议下的服务端线程使用详解的文章就介绍到这了,更多相关Java Dubbo服务端线程内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: JavaDubbo协议下的服务端线程使用详解
本文链接: https://lsjlt.com/news/197983.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0