incubator-cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alena1...@apache.org
Subject [6/6] git commit: CS-16592: process handleConnectedAgent in a separate thread pool
Date Fri, 02 Nov 2012 17:53:40 GMT
CS-16592: process handleConnectedAgent in a separate thread pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/a5077968
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/a5077968
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/a5077968

Branch: refs/heads/master
Commit: a5077968db6d32108d3332523014e1fd3646e451
Parents: 3948d7d
Author: Alena Prokharchyk <alena.prokharchyk@citrix.com>
Authored: Wed Oct 24 09:54:38 2012 -0700
Committer: Alena Prokharchyk <alena.prokharchyk@citrix.com>
Committed: Fri Nov 2 10:47:14 2012 -0700

----------------------------------------------------------------------
 .../com/cloud/agent/manager/AgentManagerImpl.java  |   57 +++++++++++----
 1 files changed, 43 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/a5077968/server/src/com/cloud/agent/manager/AgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java
index 892e405..d7edd45 100755
--- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -219,6 +219,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
Manager {
     protected AgentMonitor _monitor = null;
 
     protected ExecutorService _executor;
+    protected ThreadPoolExecutor _connectExecutor;
     
     protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine();
     
@@ -274,7 +275,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
Manager {
         registerForHostEvents(_monitor, true, true, false);
 
         _executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("AgentTaskPool"));
-
+        
+        _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool"));
+        //allow core threads to time out even when there are no items in the queue
+        _connectExecutor.allowCoreThreadTimeOut(true);
+ 
         _connection = new NioServer("AgentManager", _port, workers + 10, this);
 
         s_logger.info("Listening on " + _port + " with " + workers + " workers");
@@ -828,6 +834,8 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
Manager {
                 }
             }
         }
+        
+        _connectExecutor.shutdownNow();
         return true;
     }
 
@@ -1206,6 +1214,35 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
Manager {
             }
         }
     }
+    
+    protected class HandleAgentConnectTask implements Runnable {
+        Link _link;
+        Command[] _cmds;
+        Request _request;
+
+        HandleAgentConnectTask(Link link, final Command[] cmds, final Request request) {
+            _link = link;
+            _cmds = cmds;
+            _request = request;
+        }
+
+        @Override
+        public void run() {
+            _request.logD("Processing the first command ");
+            StartupCommand[] startups = new StartupCommand[_cmds.length];
+            for (int i = 0; i < _cmds.length; i++) {
+                startups[i] = (StartupCommand) _cmds[i];
+            }
+            AgentAttache attache = handleConnectedAgent(_link, startups, _request);
+            if (attache == null) {
+                s_logger.warn("Unable to create attache for agent: " + _request);
+            }
+        }
+    }
+    
+    protected void connectAgent(Link link, final Command[] cmds, final Request request) {
+        _connectExecutor.execute(new HandleAgentConnectTask(link, cmds, request));
+    }
 
     public class AgentHandler extends Task {
         public AgentHandler(Task.Type type, Link link, byte[] data) {
@@ -1218,21 +1255,13 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
Manager {
             Command cmd = cmds[0];
             boolean logD = true;
 
-            Response response = null;
             if (attache == null) {
-            	request.logD("Processing the first command ");
             	if (!(cmd instanceof StartupCommand)) {
             		s_logger.warn("Throwing away a request because it came through as the first
command on a connect: " + request);
-            		return;
-            	}
-
-            	StartupCommand[] startups = new StartupCommand[cmds.length];
-            	for (int i = 0; i < cmds.length; i++) {
-            		startups[i] = (StartupCommand) cmds[i];
-            	}
-            	attache = handleConnectedAgent(link, startups, request);
-            	if (attache == null) {
-            		s_logger.warn("Unable to create attache for agent: " + request);
+            	} else {
+            	    //submit the task for execution
+            	    request.logD("Scheduling the first command ");
+            	    connectAgent(link, cmds, request);
             	}
             	return;
             }
@@ -1331,7 +1360,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
Manager {
                 answers[i] = answer;
             }
 
-            response = new Response(request, answers, _nodeId, attache.getId());
+            Response response = new Response(request, answers, _nodeId, attache.getId());
             if (s_logger.isDebugEnabled()) {
                 if (logD) {
                     s_logger.debug("SeqA " + attache.getId() + "-" + response.getSequence()
+ ": Sending " + response);


Mime
View raw message