helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: Creating a separate threadpool to handle batchMessages
Date Mon, 03 Apr 2017 19:31:03 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x fab5423f1 -> 70a585aca


Creating a separate threadpool to handle batchMessages


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/70a585ac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/70a585ac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/70a585ac

Branch: refs/heads/helix-0.6.x
Commit: 70a585aca1302aff767a91c59040ad9c94439323
Parents: fab5423
Author: kishoreg <kishoreg@apache.org>
Authored: Mon Apr 3 00:10:20 2017 -0700
Committer: kishoreg <kishoreg@apache.org>
Committed: Mon Apr 3 00:10:20 2017 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 24 +++++++++++++++-----
 1 file changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/70a585ac/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index d68b272..8d3fea1 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -119,6 +119,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
 
   final ConcurrentHashMap<String, ExecutorService> _executorMap;
+  
+  /**
+   * separate executor for executing batch messages
+   */
+  private final ExecutorService _batchMessageExecutorService;
+
 
   /* Resources whose configuration for dedicate thread pool has been checked.*/
   final Set<String> _resourcesThreadpoolChecked;
@@ -126,6 +132,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   // timer for schedule timeout tasks
   final Timer _timer;
 
+
   public HelixTaskExecutor() {
     this(new ParticipantStatusMonitor(false, null));
   }
@@ -135,6 +142,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
 
     _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
     _executorMap = new ConcurrentHashMap<String, ExecutorService>();
+    _batchMessageExecutorService = Executors.newCachedThreadPool();
     _resourcesThreadpoolChecked =
         Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
@@ -261,12 +269,16 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   ExecutorService findExecutorServiceForMsg(Message message) {
     ExecutorService executorService = _executorMap.get(message.getMsgType());
     if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
-      String resourceName = message.getResourceName();
-      if (resourceName != null) {
-        String key = message.getMsgType() + "." + resourceName;
-        if (_executorMap.containsKey(key)) {
-          LOG.info("Find per-resource thread pool with key: " + key);
-          executorService = _executorMap.get(key);
+      if(message.getBatchMessageMode() == true) {
+        executorService = _batchMessageExecutorService;
+      } else {
+        String resourceName = message.getResourceName();
+        if (resourceName != null) {
+          String key = message.getMsgType() + "." + resourceName;
+          if (_executorMap.containsKey(key)) {
+            LOG.info("Find per-resource thread pool with key: " + key);
+            executorService = _executorMap.get(key);
+          }
         }
       }
     }


Mime
View raw message