cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kous...@apache.org
Subject git commit: updated refs/heads/master to 269a4ef
Date Mon, 04 Nov 2013 09:23:55 GMT
Updated Branches:
  refs/heads/master 99ead3419 -> 269a4ef11


CLOUDSTACK-4855: Throttle based on the # of outstanding requests to the directly managed HV
host (direct agents)
Cloudstack sends requests to directly managed HV hosts (direct agents) using the direct agent
thread pool. The size of the pool is determined by global config direct.agent.pool.size defaulted
to 500.

Currently there is no restriction on the number of threads a direct agent can use from this
shared thread pool to send requests to the host. This is fine as long as the host is responding
to requests
in a reasonable amount of time. But if there is a considerable delay in getting response,
the thread remain blocked for that much time. As more commands are send to the slow host threads
keep getting
blocked. This can eventually lead to a situation where requests to healthy hosts cannot be
processed as there are not enough free threads.

The problem being addressed here is to localize the impact of few bad hosts, so that entire
management server is not affected.

One such way is to throttle based on the # of outstanding requests on per host basis. The
outstanding requests to a host will be a % of direct agent pool size. This is configurable
based on
direct.agent.thread.cap. The default value is 0.1 or 10%, a value of 1 would mean the old
behavior where there is no upper cap. This will ensure that the impacted host will be bound
by a upper cap on the number of threads it can use to process requests and not the entire
pool.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/269a4ef1
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/269a4ef1
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/269a4ef1

Branch: refs/heads/master
Commit: 269a4ef11ee151fa408a7dd1f2e69cd1f7f05191
Parents: 99ead34
Author: Koushik Das <koushik@apache.org>
Authored: Wed Oct 30 15:32:01 2013 +0530
Committer: Koushik Das <koushik@apache.org>
Committed: Mon Nov 4 14:52:26 2013 +0530

----------------------------------------------------------------------
 .../com/cloud/agent/manager/AgentAttache.java   |  5 ++-
 .../cloud/agent/manager/AgentManagerImpl.java   | 28 +++++++++++------
 .../cloud/agent/manager/DirectAgentAttache.java | 33 +++++++++++++++++++-
 3 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
index ff35255..9c87812 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentAttache.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.log4j.Logger;
@@ -107,7 +108,8 @@ public abstract class AgentAttache {
     protected Long _currentSequence;
     protected Status _status = Status.Connecting;
     protected boolean _maintenance;
-    protected long                                    _nextSequence;
+    protected long _nextSequence;
+    protected AtomicInteger _outstandingTaskCount;
 
     protected AgentManagerImpl _agentMgr;
 
@@ -131,6 +133,7 @@ public abstract class AgentAttache {
         _requests = new LinkedList<Request>();
         _agentMgr = agentMgr;
         _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48;
+        _outstandingTaskCount = new AtomicInteger(0);
     }
 
     public synchronized long getNextSequence() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
index 3e684cc..39d4702 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -159,24 +159,28 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager,
Handl
     protected ScheduledExecutorService _directAgentExecutor;
     protected ScheduledExecutorService _monitorExecutor;
 
+    private int _directAgentThreadCap;
+
     protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine();
     private final Map<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007);
 
     @Inject ResourceManager _resourceMgr;
 
-    protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>(Integer.class,
"workers", "Advance", "5",
+    protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>(Integer.class,
"workers", "Advanced", "5",
             "Number of worker threads handling remote agent connections.", false);
-    protected final ConfigKey<Integer> Port = new ConfigKey<Integer>(Integer.class,
"port", "Advance", "8250", "Port to listen on for remote agent connections.", false);
-    protected final ConfigKey<Integer> PingInterval = new ConfigKey<Integer>(Integer.class,
"ping.interval", "Advance", "60",
+    protected final ConfigKey<Integer> Port = new ConfigKey<Integer>(Integer.class,
"port", "Advanced", "8250", "Port to listen on for remote agent connections.", false);
+    protected final ConfigKey<Integer> PingInterval = new ConfigKey<Integer>(Integer.class,
"ping.interval", "Advanced", "60",
             "Interval to send application level pings to make sure the connection is still
working", false);
-    protected final ConfigKey<Float> PingTimeout = new ConfigKey<Float>(Float.class,
"ping.timeout", "Advance", "2.5",
+    protected final ConfigKey<Float> PingTimeout = new ConfigKey<Float>(Float.class,
"ping.timeout", "Advanced", "2.5",
             "Multiplier to ping.interval before announcing an agent has timed out", true);
-    protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>(Integer.class,
"alert.wait", "Advance", "1800",
+    protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>(Integer.class,
"alert.wait", "Advanced", "1800",
             "Seconds to wait before alerting on a disconnected agent", true);
-    protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>(Integer.class,
"direct.agent.load.size", "Advance", "16",
+    protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>(Integer.class,
"direct.agent.load.size", "Advanced", "16",
             "The number of direct agents to load each time", false);
-    protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>(Integer.class,
"direct.agent.pool.size", "Advance", "500",
+    protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>(Integer.class,
"direct.agent.pool.size", "Advanced", "500",
             "Default size for DirectAgentPool", false);
+    protected final ConfigKey<Float> DirectAgentThreadCap = new ConfigKey<Float>(Float.class,
"direct.agent.thread.cap", "Advanced", "0.1",
+            "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used
as upper thread cap for a single direct agent to process requests", false);
 
     @Override
     public boolean configure(final String name, final Map<String, Object> params) throws
ConfigurationException {
@@ -202,10 +206,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager,
Handl
         _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this);
         s_logger.info("Listening on " + Port.value() + " with " + Workers.value() + " workers");
 
-        
         _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(),
new NamedThreadFactory("DirectAgent"));
         s_logger.debug("Created DirectAgentAttache pool with size: " + DirectAgentPoolSize.value());
-        
+        _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value())
+ 1; // add 1 to always make the value > 0
+
         _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor"));
 
         return true;
@@ -1422,6 +1426,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager,
Handl
         return _directAgentExecutor;
     }
 
+    public int getDirectAgentThreadCap() {
+        return _directAgentThreadCap;
+    }
+
     public Long getAgentPingTime(long agentId) {
         return _pingMap.get(agentId);
     }
@@ -1568,7 +1576,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager,
Handl
 
     @Override
     public ConfigKey<?>[] getConfigKeys() {
-        return new ConfigKey<?>[] {Workers, Port, PingInterval, PingTimeout, Wait,
AlertWait, DirectAgentLoadSize, DirectAgentPoolSize};
+        return new ConfigKey<?>[] {Workers, Port, PingInterval, PingTimeout, Wait,
AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, DirectAgentThreadCap};
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/269a4ef1/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
index 7d3f765..0b6a011 100755
--- a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
@@ -132,6 +132,11 @@ public class DirectAgentAttache extends AgentAttache {
         @Override
         protected synchronized void runInContext() {
             try {
+                if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap())
{
+                    s_logger.warn("Task execution for direct attache(" + _id + ") has reached
maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out");
+                    return;
+                }
+
                 ServerResource resource = _resource;
 
                 if (resource != null) {
@@ -156,6 +161,8 @@ public class DirectAgentAttache extends AgentAttache {
                 }
             } catch (Exception e) {
                 s_logger.warn("Unable to complete the ping task", e);
+            } finally {
+                _outstandingTaskCount.decrementAndGet();
             }
         }
     }
@@ -168,10 +175,32 @@ public class DirectAgentAttache extends AgentAttache {
             _req = req;
         }
 
+        private void bailout() {
+            long seq = _req.getSequence();
+            try {
+                Command[] cmds = _req.getCommands();
+                ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
+                for (Command cmd : cmds) {
+                    Answer answer = new Answer(cmd, false, "Bailed out as maximum oustanding
task limit reached");
+                    answers.add(answer);
+                }
+                Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
+                processAnswers(seq, resp);
+            } catch (Exception e) {
+                s_logger.warn(log(seq, "Exception caught in bailout "), e);
+            }
+        }
+
         @Override
         protected void runInContext() {
             long seq = _req.getSequence();
             try {
+                if (_outstandingTaskCount.incrementAndGet() > _agentMgr.getDirectAgentThreadCap())
{
+                    s_logger.warn("Task execution for direct attache(" + _id + ") has reached
maximum outstanding limit(" + _agentMgr.getDirectAgentThreadCap() + "), bailing out");
+                    bailout();
+                    return;
+                }
+
                 ServerResource resource = _resource;
                 Command[] cmds = _req.getCommands();
                 boolean stopOnError = _req.stopOnError();
@@ -186,7 +215,7 @@ public class DirectAgentAttache extends AgentAttache {
                         if (resource != null) {
                             answer = resource.executeRequest(cmds[i]);
                             if(answer == null) {
-                            	s_logger.warn("Resource returned null answer!");
+                                s_logger.warn("Resource returned null answer!");
                                 answer = new Answer(cmds[i], false, "Resource returned null
answer");
                             }
                         } else {
@@ -213,6 +242,8 @@ public class DirectAgentAttache extends AgentAttache {
                 processAnswers(seq, resp);
             } catch (Exception e) {
                 s_logger.warn(log(seq, "Exception caught "), e);
+            } finally {
+                _outstandingTaskCount.decrementAndGet();
             }
         }
     }


Mime
View raw message