diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java index 548e891c80..9c3fbc5493 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java @@ -222,17 +222,21 @@ private void registerTCPRequestProcessor() { ListenProcessor listenProcessor = new ListenProcessor(this); registerProcessor(Command.LISTEN_REQUEST, listenProcessor, taskHandleExecutorService); + ThreadPoolExecutor sendExecutorService = super.getTcpThreadPoolGroup().getSendExecutorService(); MessageTransferProcessor messageTransferProcessor = new MessageTransferProcessor(this); - registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, taskHandleExecutorService); - registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService); - registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService); - registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService); + registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, sendExecutorService); + registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService); + registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService); + ThreadPoolExecutor replyExecutorService = super.getTcpThreadPoolGroup().getReplyExecutorService(); + registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, replyExecutorService); + + ThreadPoolExecutor ackExecutorService = super.getTcpThreadPoolGroup().getAckExecutorService(); MessageAckProcessor messageAckProcessor = new MessageAckProcessor(this); - registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService); - registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService); - registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService); - registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService); + registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService); + registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService); + registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService); + registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService); } public EventMeshServer getEventMeshServer() { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java index 1891192942..5f71a57f55 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TCPThreadPoolGroup.java @@ -30,6 +30,9 @@ public class TCPThreadPoolGroup implements ThreadPoolGroup { private final EventMeshTCPConfiguration eventMeshTCPConfiguration; private ScheduledExecutorService scheduler; private ThreadPoolExecutor taskHandleExecutorService; + private ThreadPoolExecutor sendExecutorService; + private ThreadPoolExecutor ackExecutorService; + private ThreadPoolExecutor replyExecutorService; private ThreadPoolExecutor broadcastMsgDownstreamExecutorService; public TCPThreadPoolGroup(EventMeshTCPConfiguration eventMeshTCPConfiguration) { @@ -45,9 +48,27 @@ public void initThreadPool() { taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor( eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(), eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(), - new LinkedBlockingQueue<>(10_000), + new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorQueueSize()), new EventMeshThreadFactory("eventMesh-tcp-task-handle", true)); + sendExecutorService = ThreadPoolFactory.createThreadPoolExecutor( + eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorPoolSize(), + eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorPoolSize(), + new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorQueueSize()), + new EventMeshThreadFactory("eventMesh-tcp-msg-send", true)); + + replyExecutorService = ThreadPoolFactory.createThreadPoolExecutor( + eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorPoolSize(), + eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorPoolSize(), + new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorQueueSize()), + new EventMeshThreadFactory("eventMesh-tcp-msg-reply", true)); + + ackExecutorService = ThreadPoolFactory.createThreadPoolExecutor( + eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(), + eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(), + new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorQueueSize()), + new EventMeshThreadFactory("eventMesh-tcp-msg-ack", true)); + broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor( eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(), eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(), @@ -59,6 +80,9 @@ public void initThreadPool() { public void shutdownThreadPool() { scheduler.shutdown(); taskHandleExecutorService.shutdown(); + sendExecutorService.shutdown();; + replyExecutorService.shutdown(); + ackExecutorService.shutdown(); broadcastMsgDownstreamExecutorService.shutdown(); } @@ -73,4 +97,16 @@ public ThreadPoolExecutor getTaskHandleExecutorService() { public ThreadPoolExecutor getBroadcastMsgDownstreamExecutorService() { return broadcastMsgDownstreamExecutorService; } + + public ThreadPoolExecutor getSendExecutorService() { + return sendExecutorService; + } + + public ThreadPoolExecutor getAckExecutorService() { + return ackExecutorService; + } + + public ThreadPoolExecutor getReplyExecutorService() { + return replyExecutorService; + } } \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java index 2718112dd7..84871b582c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java @@ -59,7 +59,28 @@ public class EventMeshTCPConfiguration extends CommonConfiguration { private int eventMeshTcpGlobalScheduler = 5; @ConfigFiled(field = "tcp.taskHandleExecutorPoolSize") - private int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors(); + private int eventMeshTcpTaskHandleExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors(); + + @ConfigFiled(field = "tcp.sendExecutorPoolSize") + private int eventMeshTcpMsgSendExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors(); + + @ConfigFiled(field = "tcp.replyExecutorPoolSize") + private int eventMeshTcpMsgReplyExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors(); + + @ConfigFiled(field = "tcp.ackExecutorPoolSize") + private int eventMeshTcpMsgAckExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors(); + + @ConfigFiled(field = "tcp.taskHandleExecutorQueueSize") + private int eventMeshTcpTaskHandleExecutorQueueSize = 10000; + + @ConfigFiled(field = "tcp.sendExecutorQueueSize") + private int eventMeshTcpMsgSendExecutorQueueSize = 10000; + + @ConfigFiled(field = "tcp.replyExecutorQueueSize") + private int eventMeshTcpMsgReplyExecutorQueueSize = 10000; + + @ConfigFiled(field = "tcp.ackExecutorQueueSize") + private int eventMeshTcpMsgAckExecutorQueueSize = 10000; @ConfigFiled(field = "tcp.msgDownStreamExecutorPoolSize") private int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java index f6ac3d73ca..ab101ce8cc 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java @@ -167,12 +167,20 @@ private void closeSession(Session session) throws Exception { session.setSessionState(SessionState.CLOSED); - if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) { - cleanClientGroupWrapperByCloseSub(session); - } else if (EventMeshConstants.PURPOSE_PUB.equals(session.getClient().getPurpose())) { - cleanClientGroupWrapperByClosePub(session); - } else { - log.error("client purpose config is error:{}", session.getClient().getPurpose()); + final String clientGroup = session.getClient().getGroup(); + if (!lockMap.containsKey(clientGroup)) { + lockMap.putIfAbsent(clientGroup, new Object()); + } + synchronized (lockMap.get(clientGroup)) { + if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) { + cleanClientGroupWrapperByCloseSub(session); + } else if (EventMeshConstants.PURPOSE_PUB.equals( + session.getClient().getPurpose())) { + cleanClientGroupWrapperByClosePub(session); + } else { + log.error("client purpose config is error:{}", + session.getClient().getPurpose()); + } } if (session.getContext() != null) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java index e6fce72cbd..913ac31036 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventMeshRebalanceService.java @@ -20,7 +20,6 @@ import org.apache.eventmesh.common.EventMeshThreadFactory; import org.apache.eventmesh.common.ThreadPoolFactory; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; -import org.apache.eventmesh.runtime.util.EventMeshUtil; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -67,7 +66,7 @@ public void shutdown() { log.info("rebalance service shutdown......"); } - public void printRebalanceThreadPoolState() { - EventMeshUtil.printState((ThreadPoolExecutor) serviceRebalanceScheduler); + public int getRebalanceThreadPoolQueueSize() { + return ((ThreadPoolExecutor) serviceRebalanceScheduler).getQueue().size(); } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java index 1808a5c3cb..5ca235e64c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -126,8 +127,16 @@ public void start() throws Exception { }), delay, period, TimeUnit.MILLISECONDS); - monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> { - eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState(); + monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> { + appLogger.info("{TaskHandle:{},Send:{},Ack:{},Reply:{},Push:{},Scheduler:{},Rebalance:{}}", + eventMeshTCPServer.getTcpThreadPoolGroup().getTaskHandleExecutorService().getQueue().size(), + eventMeshTCPServer.getTcpThreadPoolGroup().getSendExecutorService().getQueue().size(), + eventMeshTCPServer.getTcpThreadPoolGroup().getAckExecutorService().getQueue().size(), + eventMeshTCPServer.getTcpThreadPoolGroup().getReplyExecutorService().getQueue().size(), + eventMeshTCPServer.getTcpThreadPoolGroup().getBroadcastMsgDownstreamExecutorService().getQueue().size(), + ((ThreadPoolExecutor) eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler()).getQueue().size(), + eventMeshTCPServer.getEventMeshRebalanceService().getRebalanceThreadPoolQueueSize()); + eventMeshTCPServer.getTcpRetryer().printState(); // monitor retry queue size @@ -137,7 +146,6 @@ public void start() throws Exception { EventMeshConstants.PROTOCOL_TCP, MonitorMetricConstants.RETRY_QUEUE_SIZE, tcpSummaryMetrics.getRetrySize()); - }, 10, PRINT_THREADPOOLSTATE_INTERVAL, TimeUnit.SECONDS); log.info("EventMeshTcpMonitor started......"); }