cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [20/36] Commit to try something on removing getZone
Date Fri, 06 Sep 2013 22:38:50 GMT
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
new file mode 100755
index 0000000..3b24e94
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -0,0 +1,1419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.ejb.Local;
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+
+import org.apache.cloudstack.framework.config.ConfigDepot;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.ConfigValue;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.CancelCommand;
+import com.cloud.agent.api.ChangeAgentAnswer;
+import com.cloud.agent.api.ChangeAgentCommand;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.PropagateResourceEventCommand;
+import com.cloud.agent.api.ScheduleHostScanTaskCommand;
+import com.cloud.agent.api.TransferAgentCommand;
+import com.cloud.agent.transport.Request;
+import com.cloud.agent.transport.Request.Version;
+import com.cloud.agent.transport.Response;
+import com.cloud.cluster.ClusterManager;
+import com.cloud.cluster.ClusterManagerListener;
+import com.cloud.cluster.ClusterServicePdu;
+import com.cloud.cluster.ClusteredAgentRebalanceService;
+import com.cloud.cluster.ManagementServerHost;
+import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner;
+import com.cloud.cluster.agentlb.HostTransferMapVO;
+import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
+import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
+import com.cloud.cluster.dao.ManagementServerHostDao;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.Status.Event;
+import com.cloud.resource.ServerResource;
+import com.cloud.serializer.GsonHelper;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Profiler;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.SearchCriteria2;
+import com.cloud.utils.db.SearchCriteriaService;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.nio.Link;
+import com.cloud.utils.nio.Task;
+
+@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class })
+public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
+    final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
+    private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor"));
+    private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
+
+    public final static long STARTUP_DELAY = 5000;
+    public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
+    public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
+    protected Set<Long> _agentToTransferIds = new HashSet<Long>();
+
+    Gson _gson;
+
+    @Inject
+    protected ClusterManager _clusterMgr = null;
+
+    protected HashMap<String, SocketChannel> _peers;
+    protected HashMap<String, SSLEngine> _sslEngines;
+    private final Timer _timer = new Timer("ClusteredAgentManager Timer");
+
+    @Inject
+    protected ManagementServerHostDao _mshostDao;
+    @Inject
+    protected HostTransferMapDao _hostTransferDao;
+
+    // @com.cloud.utils.component.Inject(adapter = AgentLoadBalancerPlanner.class)
+    @Inject protected List<AgentLoadBalancerPlanner> _lbPlanners;
+
+    @Inject ConfigurationDao _configDao;
+    @Inject
+    ConfigDepot _configDepot;
+
+    protected ClusteredAgentManagerImpl() {
+        super();
+    }
+
+    protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false",
+            "Enable agent load balancing between management server nodes", true);
+    protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7",
+            "What percentage of the agents can be held by one management server before load balancing happens", true);
+    protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16",
+            "How many agents to connect to in each round", true);
+    protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90",
+        "Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
+    
+
+    protected ConfigValue<Boolean> _agentLBEnabled;
+    protected ConfigValue<Double> _connectedAgentsThreshold;
+    protected ConfigValue<Integer> _loadSize;
+    protected ConfigValue<Integer> _directAgentScanInterval;
+
+    @Override
+    public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
+        _peers = new HashMap<String, SocketChannel>(7);
+        _sslEngines = new HashMap<String, SSLEngine>(7);
+        _nodeId = ManagementServerNode.getManagementServerId();
+
+        s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId);
+
+        _loadSize = _configDepot.get(LoadSize);
+        _directAgentScanInterval = _configDepot.get(ScanInterval);
+        _agentLBEnabled = _configDepot.get(EnableLB);
+        _connectedAgentsThreshold = _configDepot.get(ConnectedAgentThreshold);
+
+        ClusteredAgentAttache.initialize(this);
+
+        _clusterMgr.registerListener(this);
+        _clusterMgr.registerDispatcher(new ClusterDispatcher());
+        
+        _gson = GsonHelper.getGson();
+
+        return super.configure(name, xmlParams);
+    }
+
+    @Override
+    public boolean start() {
+        if (!super.start()) {
+            return false;
+        }
+        _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval.value());
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval.value() + " seconds");
+        }
+
+        // schedule transfer scan executor - if agent LB is enabled
+        if (isAgentRebalanceEnabled()) {
+            s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        return true;
+    }
+
+    public void scheduleHostScanTask() {
+        _timer.schedule(new DirectAgentScanTimerTask(), 0);
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Scheduled a direct agent scan task");
+        }
+    }
+
+    private void runDirectAgentScanTimerTask() {
+        scanDirectAgentToLoad();
+    }
+
+    private void scanDirectAgentToLoad() {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("Begin scanning directly connected hosts");
+        }
+
+        // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
+        long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout();
+        List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize.value().longValue(), _nodeId);
+        List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId);
+        hosts.addAll(appliances);
+
+        if (hosts != null && hosts.size() > 0) {
+            s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them...");
+            for (HostVO host : hosts) {
+                try {
+                    AgentAttache agentattache = findAttache(host.getId());
+                    if (agentattache != null) {
+                        // already loaded, skip
+                        if (agentattache.forForward()) {
+                            if (s_logger.isInfoEnabled()) {
+                                s_logger.info(host + " is detected down, but we have a forward attache running, disconnect this one before launching the host");
+                            }
+                            removeAgent(agentattache, Status.Disconnected);
+                        } else {
+                            continue;
+                        }
+                    }
+
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")");
+                    }
+                    loadDirectlyConnectedHost(host, false);
+                } catch (Throwable e) {
+                    s_logger.warn(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to ",e);
+                }
+            }
+        }
+
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("End scanning directly connected hosts");
+        }
+    }
+
+    private class DirectAgentScanTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            try {
+                runDirectAgentScanTimerTask();
+            } catch (Throwable e) {
+                s_logger.error("Unexpected exception " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public Task create(Task.Type type, Link link, byte[] data) {
+        return new ClusteredAgentHandler(type, link, data);
+    }
+
+    protected AgentAttache createAttache(long id) {
+        s_logger.debug("create forwarding ClusteredAgentAttache for " + id);
+        final AgentAttache attache = new ClusteredAgentAttache(this, id);
+        AgentAttache old = null;
+        synchronized (_agents) {
+            old = _agents.get(id);
+            _agents.put(id, attache);
+        }
+        if (old != null) {
+            old.disconnect(Status.Removed);
+        }
+        return attache;
+    }
+
+    @Override
+    protected AgentAttache createAttacheForConnect(HostVO host, Link link) {
+        s_logger.debug("create ClusteredAgentAttache for " + host.getId());
+        final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), link, host.isInMaintenanceStates());
+        link.attach(attache);
+        AgentAttache old = null;
+        synchronized (_agents) {
+            old = _agents.get(host.getId());
+            _agents.put(host.getId(), attache);
+        }
+        if (old != null) {
+            old.disconnect(Status.Removed);
+        }
+        return attache;
+    }
+
+    @Override
+    protected AgentAttache createAttacheForDirectConnect(HostVO host, ServerResource resource) {
+//        if (resource instanceof DummySecondaryStorageResource) {
+//            return new DummyAttache(this, host.getId(), false);
+//        }
+        s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId());
+        final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), _nodeId, resource, host.isInMaintenanceStates(), this);
+        AgentAttache old = null;
+        synchronized (_agents) {
+            old = _agents.get(host.getId());
+            _agents.put(host.getId(), attache);
+        }
+        if (old != null) {
+            old.disconnect(Status.Removed);
+        }
+        return attache;
+    }
+
+    @Override
+    protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event, boolean transitState, boolean removeAgent) {
+        return handleDisconnect(attache, event, false, true, removeAgent);
+    }
+
+    @Override
+    protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) {
+        return handleDisconnect(attache, event, true, true, true);
+    }
+
+    protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast, boolean removeAgent) {
+        boolean res;
+        if (!investigate) {
+            res = super.handleDisconnectWithoutInvestigation(agent, event, true, removeAgent);
+        } else {
+            res = super.handleDisconnectWithInvestigation(agent, event);
+        }
+
+		if (res) {
+			if (broadcast) {
+				notifyNodesInCluster(agent);
+			}
+			return true;
+		} else {
+			return false;
+		}
+    }
+
+    @Override
+    public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException {
+        if (event == Event.AgentDisconnected) {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("Received agent disconnect event for host " + hostId);
+            }
+            AgentAttache attache = findAttache(hostId);
+            if (attache != null) {
+                //don't process disconnect if the host is being rebalanced
+                if (isAgentRebalanceEnabled()) {
+                    HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
+                    if (transferVO != null) {
+                        if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
+                            s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id="
+                                    + hostId +" as the host is being connected to " + _nodeId);
+                            return true;
+                        }
+                    }
+                }
+
+                //don't process disconnect if the disconnect came for the host via delayed cluster notification,
+                //but the host has already reconnected to the current management server
+                if (!attache.forForward()) {
+                    s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id="
+                            + hostId +" as the host is directly connected to the current management server " + _nodeId);
+                    return true;
+                }
+
+                return super.handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, false, true);
+            }
+
+            return true;
+        } else {
+            return super.executeUserRequest(hostId, event);
+        }
+    }
+
+    @Override
+    public boolean reconnect(final long hostId) {
+        Boolean result;
+        try {
+            result = propagateAgentEvent(hostId, Event.ShutdownRequested);
+	        if (result != null) {
+	            return result;
+	        }
+        } catch (AgentUnavailableException e) {
+	        s_logger.debug("cannot propagate agent reconnect because agent is not available", e);
+	        return false;
+        }
+
+        return super.reconnect(hostId);
+    }
+
+    public void notifyNodesInCluster(AgentAttache attache) {
+        s_logger.debug("Notifying other nodes of to disconnect");
+        Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) };
+        _clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
+    }
+
+    // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
+    public void notifyNodesInClusterToScheduleHostScanTask() {
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Notifying other MS nodes to run host scan task");
+        }
+        Command[] cmds = new Command[] { new ScheduleHostScanTaskCommand() };
+        _clusterMgr.broadcast(0, _gson.toJson(cmds));
+    }
+
+    protected static void logT(byte[] bytes, final String msg) {
+        s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
+                + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
+    }
+
+    protected static void logD(byte[] bytes, final String msg) {
+        s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
+                + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
+    }
+
+    protected static void logI(byte[] bytes, final String msg) {
+        s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
+                + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
+    }
+
+    public boolean routeToPeer(String peer, byte[] bytes) {
+        int i = 0;
+        SocketChannel ch = null;
+        SSLEngine sslEngine = null;
+        while (i++ < 5) {
+            ch = connectToPeer(peer, ch);
+            if (ch == null) {
+                try {
+                    logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString());
+                } catch (Exception e) {
+                }
+                return false;
+            }
+            sslEngine = getSSLEngine(peer);
+            if (sslEngine == null) {
+                logD(bytes, "Unable to get SSLEngine of peer: " + peer);
+                return false;
+            }
+            try {
+                if (s_logger.isDebugEnabled()) {
+                    logD(bytes, "Routing to peer");
+                }
+                Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) }, sslEngine);
+                return true;
+            } catch (IOException e) {
+                try {
+                    logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage());
+                } catch (Exception ex) {
+                }
+            }
+        }
+        return false;
+    }
+
+    public String findPeer(long hostId) {
+        return getPeerName(hostId);
+    }
+
+    public SSLEngine getSSLEngine(String peerName) {
+        return _sslEngines.get(peerName);
+    }
+
+    public void cancel(String peerName, long hostId, long sequence, String reason) {
+        CancelCommand cancel = new CancelCommand(sequence, reason);
+        Request req = new Request(hostId, _nodeId, cancel, true);
+        req.setControl(true);
+        routeToPeer(peerName, req.getBytes());
+    }
+
+    public void closePeer(String peerName) {
+        synchronized (_peers) {
+            SocketChannel ch = _peers.get(peerName);
+            if (ch != null) {
+                try {
+                    ch.close();
+                } catch (IOException e) {
+                    s_logger.warn("Unable to close peer socket connection to " + peerName);
+                }
+            }
+            _peers.remove(peerName);
+            _sslEngines.remove(peerName);
+        }
+    }
+
+    public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) {
+        synchronized (_peers) {
+            SocketChannel ch = _peers.get(peerName);
+            SSLEngine sslEngine = null;
+            if (prevCh != null) {
+                try {
+                    prevCh.close();
+                } catch (Exception e) {
+                }
+            }
+            if (ch == null || ch == prevCh) {
+                ManagementServerHost ms = _clusterMgr.getPeer(peerName);
+                if (ms == null) {
+                    s_logger.info("Unable to find peer: " + peerName);
+                    return null;
+                }
+                String ip = ms.getServiceIP();
+                InetAddress addr;
+                try {
+                    addr = InetAddress.getByName(ip);
+                } catch (UnknownHostException e) {
+                    throw new CloudRuntimeException("Unable to resolve " + ip);
+                }
+                try {
+                    ch = SocketChannel.open(new InetSocketAddress(addr, _port.value()));
+                    ch.configureBlocking(true); // make sure we are working at blocking mode
+                    ch.socket().setKeepAlive(true);
+                    ch.socket().setSoTimeout(60 * 1000);
+                    try {
+                        SSLContext sslContext = Link.initSSLContext(true);
+                        sslEngine = sslContext.createSSLEngine(ip, _port.value());
+                        sslEngine.setUseClientMode(true);
+
+                        Link.doHandshake(ch, sslEngine, true);
+                        s_logger.info("SSL: Handshake done");
+                    } catch (Exception e) {
+                        throw new IOException("SSL: Fail to init SSL! " + e);
+                    }
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip);
+                    }
+                    _peers.put(peerName, ch);
+                    _sslEngines.put(peerName, sslEngine);
+                } catch (IOException e) {
+                    s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e);
+                    return null;
+                }
+            }
+
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Found open channel for peer: " + peerName);
+            }
+            return ch;
+        }
+    }
+
+    public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) {
+        String peerName = getPeerName(hostId);
+        if (peerName == null) {
+            return null;
+        }
+
+        return connectToPeer(peerName, prevCh);
+    }
+
+    @Override
+    protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException {
+        assert (hostId != null) : "Who didn't check their id value?";
+        HostVO host = _hostDao.findById(hostId);
+        if (host == null) {
+            throw new AgentUnavailableException("Can't find the host ", hostId);
+        }
+
+        AgentAttache agent = findAttache(hostId);
+        if (agent == null) {
+            if (host.getStatus() == Status.Up && (host.getManagementServerId() != null && host.getManagementServerId() != _nodeId)) {
+                agent = createAttache(hostId);
+            }
+        }
+        if (agent == null) {
+        	AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId);
+            ex.addProxyObject(_entityMgr.findById(Host.class, hostId).getUuid());
+            throw ex;
+        }
+
+        return agent;
+    }
+
+    @Override
+    public boolean stop() {
+        if (_peers != null) {
+            for (SocketChannel ch : _peers.values()) {
+                try {
+                    s_logger.info("Closing: " + ch.toString());
+                    ch.close();
+                } catch (IOException e) {
+                }
+            }
+        }
+        _timer.cancel();
+
+        //cancel all transfer tasks
+        s_transferExecutor.shutdownNow();
+        cleanupTransferMap(_nodeId);
+
+        return super.stop();
+    }
+
+    @Override
+    public void startDirectlyConnectedHosts() {
+        // override and let it be dummy for purpose, we will scan and load direct agents periodically.
+        // We may also pickup agents that have been left over from other crashed management server
+    }
+
+    public class ClusteredAgentHandler extends AgentHandler {
+
+        public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) {
+            super(type, link, data);
+        }
+
+        @Override
+        protected void doTask(final Task task) throws Exception {
+            Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+            try {
+                if (task.getType() != Task.Type.DATA) {
+                    super.doTask(task);
+                    return;
+                }
+
+                final byte[] data = task.getData();
+                Version ver = Request.getVersion(data);
+                if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) {
+                    s_logger.warn("Wrong version for clustered agent request");
+                    super.doTask(task);
+                    return;
+                }
+
+                long hostId = Request.getAgentId(data);
+                Link link = task.getLink();
+
+                if (Request.fromServer(data)) {
+
+                    AgentAttache agent = findAttache(hostId);
+
+                    if (Request.isControl(data)) {
+                        if (agent == null) {
+                            logD(data, "No attache to process cancellation");
+                            return;
+                        }
+                        Request req = Request.parse(data);
+                        Command[] cmds = req.getCommands();
+                        CancelCommand cancel = (CancelCommand) cmds[0];
+                        if (s_logger.isDebugEnabled()) {
+                            logD(data, "Cancel request received");
+                        }
+                        agent.cancel(cancel.getSequence());
+                        final Long current = agent._currentSequence;
+                        //if the request is the current request, always have to trigger sending next request in sequence,
+                        //otherwise the agent queue will be blocked
+                        if (req.executeInSequence() && (current != null && current == Request.getSequence(data))) {
+                            agent.sendNext(Request.getSequence(data));
+                        }
+                        return;
+                    }
+
+                    try {
+                        if (agent == null || agent.isClosed()) {
+                            throw new AgentUnavailableException("Unable to route to agent ", hostId);
+                        }
+
+                        if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) {
+                            // route it to the agent.
+                            // But we have the serialize the control commands here so we have
+                            // to deserialize this and send it through the agent attache.
+                            Request req = Request.parse(data);
+                            agent.send(req, null);
+                            return;
+                        } else {
+                            if (agent instanceof Routable) {
+                                Routable cluster = (Routable) agent;
+                                cluster.routeToAgent(data);
+                            } else {
+                                agent.send(Request.parse(data));
+                            }
+                            return;
+                        }
+                    } catch (AgentUnavailableException e) {
+                        logD(data, e.getMessage());
+                        cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage());
+                    }
+                } else {
+
+                    long mgmtId = Request.getManagementServerId(data);
+                    if (mgmtId != -1 && mgmtId != _nodeId) {
+                        routeToPeer(Long.toString(mgmtId), data);
+                        if (Request.requiresSequentialExecution(data)) {
+                            AgentAttache attache = (AgentAttache) link.attachment();
+                            if (attache != null) {
+                                attache.sendNext(Request.getSequence(data));
+                            } else if (s_logger.isDebugEnabled()) {
+                                logD(data, "No attache to process " + Request.parse(data).toString());
+                            }
+                        }
+                        return;
+                    } else {
+                        if (Request.isRequest(data)) {
+                            super.doTask(task);
+                        } else {
+                            // received an answer.
+                            final Response response = Response.parse(data);
+                            AgentAttache attache = findAttache(response.getAgentId());
+                            if (attache == null) {
+                                s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString());
+                                return;
+                            }
+                            if (!attache.processAnswers(response.getSequence(), response)) {
+                                s_logger.info("SeqA " + attache.getId() + "-" + response.getSequence() + ": Response is not processed: " + response.toString());
+                            }
+                        }
+                        return;
+                    }
+                }
+            } finally {
+                txn.close();
+            }
+        }
+    }
+
+    @Override
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+    }
+
+    @Override
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost vo : nodeList) {
+            s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid());
+            long lastPing = (System.currentTimeMillis() >> 10) - getTimeout();
+            _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing);
+            s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid());
+            cleanupTransferMap(vo.getMsid());
+        }
+    }
+
+    @Override
+    public void onManagementNodeIsolated() {
+    }
+
+    @Override
+    public void removeAgent(AgentAttache attache, Status nextState) {
+        if (attache == null) {
+            return;
+        }
+
+        super.removeAgent(attache, nextState);
+    }
+
+    @Override
+    public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException {
+    	boolean result = false;
+        if (event == Event.RequestAgentRebalance) {
+            return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
+        } else if (event == Event.StartAgentRebalance) {
+            try {
+            	result = rebalanceHost(agentId, currentOwnerId, futureOwnerId);
+            } catch (Exception e) {
+                s_logger.warn("Unable to rebalance host id=" + agentId, e);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void scheduleRebalanceAgents() {
+        _timer.schedule(new AgentLoadBalancerTask(), 30000);
+    }
+
+    public class AgentLoadBalancerTask extends TimerTask {
+        protected volatile boolean cancelled = false;
+
+        public AgentLoadBalancerTask() {
+            s_logger.debug("Agent load balancer task created");
+        }
+
+        @Override
+        public synchronized boolean cancel() {
+            if (!cancelled) {
+                cancelled = true;
+                s_logger.debug("Agent load balancer task cancelled");
+                return super.cancel();
+            }
+            return true;
+        }
+
+        @Override
+        public synchronized void run() {
+        	try {
+	            if (!cancelled) {
+	                startRebalanceAgents();
+	                if (s_logger.isInfoEnabled()) {
+	                    s_logger.info("The agent load balancer task is now being cancelled");
+	                }
+	                cancelled = true;
+	            }
+        	} catch(Throwable e) {
+        		s_logger.error("Unexpected exception " + e.toString(), e);
+        	}
+        }
+    }
+
+    public void startRebalanceAgents() {
+        s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents");
+        List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up);
+        SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
+        sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
+        sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
+        List<HostVO> allManagedAgents = sc.list();
+
+        int avLoad = 0;
+
+        if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) {
+            avLoad = allManagedAgents.size() / allMS.size();
+        } else {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allManagedAgents.size());
+            }
+            return;
+        }
+
+        if (avLoad == 0L) {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("As calculated average load is less than 1, rounding it to 1");
+            }
+            avLoad = 1;
+        }
+
+        for (ManagementServerHostVO node : allMS) {
+            if (node.getMsid() != _nodeId) {
+
+                List<HostVO> hostsToRebalance = new ArrayList<HostVO>();
+                for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
+                    hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad);
+                    if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
+                        break;
+                    } else {
+                        s_logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + node.getMsid());
+                    }
+                }
+
+
+                if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
+                    s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid());
+                    for (HostVO host : hostsToRebalance) {
+                        long hostId = host.getId();
+                        s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
+                        boolean result = true;
+
+                        if (_hostTransferDao.findById(hostId) != null) {
+                            s_logger.warn("Somebody else is already rebalancing host id: " + hostId);
+                            continue;
+                        }
+
+                        HostTransferMapVO transfer = null;
+                        try {
+                            transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
+                            Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance);
+                            if (answer == null) {
+                                s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
+                                result = false;
+                            }
+                        } catch (Exception ex) {
+                            s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex);
+                            result = false;
+                        } finally {
+                            if (transfer != null) {
+                                HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId);
+                                if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) {
+                                    if (s_logger.isDebugEnabled()) {
+                                        s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
+                                    }
+                                    //just remove the mapping (if exists) as nothing was done on the peer management server yet
+                                    _hostTransferDao.remove(transfer.getId());
+                                }
+                            }
+                        }
+                    }
+                } else {
+                    s_logger.debug("Found no hosts to rebalance from the management server " + node.getMsid());
+                }
+            }
+        }
+    }
+
+    private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) {
+        TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
+        Commands commands = new Commands(Command.OnError.Stop);
+        commands.addCommand(transfer);
+
+        Command[] cmds = commands.toCommands();
+
+        try {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
+            }
+            String peerName = Long.toString(peer);
+            String cmdStr = _gson.toJson(cmds);
+            String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true);
+            Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
+            return answers;
+        } catch (Exception e) {
+            s_logger.warn("Caught exception while talking to " + currentOwnerId, e);
+            return null;
+        }
+    }
+
+    public String getPeerName(long agentHostId) {
+
+        HostVO host = _hostDao.findById(agentHostId);
+        if (host != null && host.getManagementServerId() != null) {
+            if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
+                return null;
+            }
+
+            return Long.toString(host.getManagementServerId());
+        }
+        return null;
+    }
+
+
+    public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
+        final String msPeer = getPeerName(agentId);
+        if (msPeer == null) {
+            return null;
+        }
+
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
+        }
+        Command[] cmds = new Command[1];
+        cmds[0] = new ChangeAgentCommand(agentId, event);
+
+        String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true);
+        if (ansStr == null) {
+            throw new AgentUnavailableException(agentId);
+        }
+        
+        Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
+
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Result for agent change is " + answers[0].getResult());
+        }
+
+        return answers[0].getResult();
+    }
+
+    private Runnable getTransferScanTask() {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId);
+                    }
+                    synchronized (_agentToTransferIds) {
+                        if (_agentToTransferIds.size() > 0) {
+                            s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
+                            //for (Long hostId : _agentToTransferIds) {
+                            for (Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
+                                Long hostId = iterator.next();
+                                AgentAttache attache = findAttache(hostId);
+
+                                // if the thread:
+                                // 1) timed out waiting for the host to reconnect
+                                // 2) recipient management server is not active any more
+                                // 3) if the management server doesn't own the host any more
+                                // remove the host from re-balance list and delete from op_host_transfer DB
+                                // no need to do anything with the real attache as we haven't modified it yet
+                                Date cutTime = DateUtil.currentGMTTime();
+                                HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
+
+                                if (transferMap == null) {
+                                    s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
+                                    iterator.remove();
+                                    _hostTransferDao.completeAgentTransfer(hostId);
+                                    continue;
+                                }
+
+                                if (transferMap.getInitialOwner() != _nodeId || attache == null || attache.forForward()) {
+                                    s_logger.debug("Management server " + _nodeId + " doesn't own host id=" + hostId + " any more, skipping rebalance for the host");
+                                    iterator.remove();
+                                    _hostTransferDao.completeAgentTransfer(hostId);
+                                    continue;
+                                }
+
+                                ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
+                                if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
+                                    s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
+                                    iterator.remove();
+                                    _hostTransferDao.completeAgentTransfer(hostId);
+                                    continue;
+                                }
+
+                                if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
+                                    iterator.remove();
+                                    try {
+                                        _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner()));
+                                    } catch (RejectedExecutionException ex) {
+                                        s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution");
+                                        continue;
+                                    }
+
+                                } else {
+                                    s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
+                                }
+                            }
+                        } else {
+                            if (s_logger.isTraceEnabled()) {
+                                s_logger.trace("Found no agents to be transfered by the management server " + _nodeId);
+                            }
+                        }
+                    }
+
+                } catch (Throwable e) {
+                    s_logger.error("Problem with the clustered agent transfer scan check!", e);
+                }
+            }
+        };
+    }
+
+
+    private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) {
+        s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer");
+        synchronized (_agentToTransferIds) {
+            return  _agentToTransferIds.add(hostId);
+        }
+    }
+
+
+    protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{
+
+        boolean result = true;
+        if (currentOwnerId == _nodeId) {
+            if (!startRebalance(hostId)) {
+                s_logger.debug("Failed to start agent rebalancing");
+                finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
+                return false;
+            }
+            try {
+                Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance);
+                if (answer == null || !answer[0].getResult()) {
+                    result = false;
+                }
+
+            } catch (Exception ex) {
+                s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex);
+                result = false;
+            }
+
+            if (result) {
+                s_logger.debug("Successfully transfered host id=" + hostId + " to management server " + futureOwnerId);
+                finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted);
+            } else {
+                s_logger.warn("Failed to transfer host id=" + hostId + " to management server " + futureOwnerId);
+                finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
+            }
+
+        } else if (futureOwnerId == _nodeId) {
+            HostVO host = _hostDao.findById(hostId);
+            try {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Disconnecting host " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification");
+                }
+
+                AgentAttache attache = findAttache(hostId);
+                if (attache != null) {
+                    result = handleDisconnect(attache, Event.AgentDisconnected, false, false, true);
+                }
+
+                if (result) {
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
+                    }
+                    result = loadDirectlyConnectedHost(host, true);
+                } else {
+                    s_logger.warn("Failed to disconnect " + host.getId() + "(" + host.getName() +
+                            " as a part of rebalance process without notification");
+                }
+
+            } catch (Exception ex) {
+                s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex);
+                result = false;
+            }
+
+            if (result) {
+                s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
+            } else {
+                s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
+            }
+        }
+
+        return result;
+    }
+
+
+    protected void finishRebalance(final long hostId, long futureOwnerId, Event event){
+
+        boolean success = (event == Event.RebalanceCompleted) ? true : false;
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event);
+        }
+
+        AgentAttache attache = findAttache(hostId);
+        if (attache == null || !(attache instanceof ClusteredAgentAttache)) {
+            s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already");
+            _hostTransferDao.completeAgentTransfer(hostId);
+            return;
+        }
+
+        ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
+
+        if (success) {
+
+            //1) Set transfer mode to false - so the agent can start processing requests normally
+            forwardAttache.setTransferMode(false);
+
+            //2) Get all transfer requests and route them to peer
+            Request requestToTransfer = forwardAttache.getRequestToTransfer();
+            while (requestToTransfer != null) {
+                s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
+                boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
+                if (!routeResult) {
+                    logD(requestToTransfer.getBytes(), "Failed to route request to peer");
+                }
+
+                requestToTransfer = forwardAttache.getRequestToTransfer();
+            }
+
+            s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance to " + futureOwnerId);
+
+        } else {
+            failRebalance(hostId);
+        }
+
+        s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance");
+        _hostTransferDao.completeAgentTransfer(hostId);
+    }
+
+    protected void failRebalance(final long hostId){
+        try {
+            s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId);
+            _hostTransferDao.completeAgentTransfer(hostId);
+            handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed, true, true);
+        } catch (Exception ex) {
+            s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup");
+        }
+    }
+
+    protected boolean startRebalance(final long hostId) {
+        HostVO host = _hostDao.findById(hostId);
+
+        if (host == null || host.getRemoved() != null) {
+            s_logger.warn("Unable to find host record, fail start rebalancing process");
+            return false;
+        }
+
+        synchronized (_agents) {
+            ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
+            if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
+            	handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
+                ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
+                if (forwardAttache == null) {
+                    s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process");
+                    return false;
+                }
+                s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
+                forwardAttache.setTransferMode(true);
+                _agents.put(hostId, forwardAttache);
+            } else {
+                if (attache == null) {
+                    s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
+                } else {
+                    s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
+                }
+                return false;
+            }
+        }
+        _hostTransferDao.startAgentTransfer(hostId);
+        return true;
+    }
+
+    protected void cleanupTransferMap(long msId) {
+        List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId);
+
+        for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) {
+            _hostTransferDao.remove(hostJoingingCluster.getId());
+        }
+
+        List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId);
+        for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) {
+            _hostTransferDao.remove(hostLeavingCluster.getId());
+        }
+    }
+
+
+    protected class RebalanceTask implements Runnable {
+        Long hostId = null;
+        Long currentOwnerId = null;
+        Long futureOwnerId = null;
+
+
+        public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) {
+            this.hostId = hostId;
+            this.currentOwnerId = currentOwnerId;
+            this.futureOwnerId = futureOwnerId;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Rebalancing host id=" + hostId);
+                }
+                rebalanceHost(hostId, currentOwnerId, futureOwnerId);
+            } catch (Exception e) {
+                s_logger.warn("Unable to rebalance host id=" + hostId, e);
+            }
+        }
+    }
+
+    private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
+        }
+
+        try {
+            scheduleHostScanTask();
+        } catch (Exception e) {
+            // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
+            // happens at fixed intervals anyways. So handling any exceptions that may be thrown
+            s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName()
+                    + ", ignoring as regular host scan happens at fixed interval anyways", e);
+            return null;
+        }
+
+        Answer[] answers = new Answer[1];
+        answers[0] = new Answer(cmd, true, null);
+        return _gson.toJson(answers);
+    }
+
+    public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
+        Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue);
+        for (Command cmd : cmds) {
+            commands.addCommand(cmd);
+        }
+        return send(hostId, commands);
+    }
+
+
+    protected class ClusterDispatcher implements ClusterManager.Dispatcher {
+        @Override
+        public String getName() {
+            return "ClusterDispatcher";
+        }
+
+        @Override
+        public String dispatch(ClusterServicePdu pdu) {
+
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
+            }
+
+            Command[] cmds = null;
+            try {
+                cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
+            } catch (Throwable e) {
+                assert (false);
+                s_logger.error("Excection in gson decoding : ", e);
+            }
+
+            if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) {  //intercepted
+                ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
+
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
+                }
+                boolean result = false;
+                try {
+                    result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Result is " + result);
+                    }
+
+                } catch (AgentUnavailableException e) {
+                    s_logger.warn("Agent is unavailable", e);
+                    return null;
+                }
+
+                Answer[] answers = new Answer[1];
+                answers[0] = new ChangeAgentAnswer(cmd, result);
+                return _gson.toJson(answers);
+            } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
+                TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
+
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
+                }
+                boolean result = false;
+                try {
+                    result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Result is " + result);
+                    }
+
+                } catch (AgentUnavailableException e) {
+                    s_logger.warn("Agent is unavailable", e);
+                    return null;
+                } catch (OperationTimedoutException e) {
+                    s_logger.warn("Operation timed out", e);
+                    return null;
+                }
+                Answer[] answers = new Answer[1];
+                answers[0] = new Answer(cmd, result, null);
+                return _gson.toJson(answers);
+            } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
+                PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
+
+                s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
+
+                boolean result = false;
+                try {
+                    result = _resourceMgr.executeUserRequest(cmd.getHostId(), cmd.getEvent());
+                    s_logger.debug("Result is " + result);
+                } catch (AgentUnavailableException ex) {
+                    s_logger.warn("Agent is unavailable", ex);
+                    return null;
+                }
+
+                Answer[] answers = new Answer[1];
+                answers[0] = new Answer(cmd, result, null);
+                return _gson.toJson(answers);
+            } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
+                ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
+                String response = handleScheduleHostScanTaskCommand(cmd);
+                return response;
+            }
+
+            try {
+                long startTick = System.currentTimeMillis();
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
+                }
+
+                Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
+                if (answers != null) {
+                    String jsonReturn = _gson.toJson(answers);
+
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
+                                " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
+                    }
+
+                    return jsonReturn;
+                } else {
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
+                                " in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
+                    }
+                }
+            } catch (AgentUnavailableException e) {
+                s_logger.warn("Agent is unavailable", e);
+            } catch (OperationTimedoutException e) {
+                s_logger.warn("Timed Out", e);
+            }
+
+            return null;
+        }
+
+    }
+
+    public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
+        return executeUserRequest(agentId, event);
+    }
+
+    public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
+        return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
+    }
+
+    public boolean isAgentRebalanceEnabled() {
+        return _agentLBEnabled.value();
+    }
+
+    private ClusteredAgentRebalanceService _rebalanceService;
+
+    boolean _agentLbHappened = false;
+    public void agentrebalance() {
+        Profiler profilerAgentLB = new Profiler();
+        profilerAgentLB.start();
+        //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
+        if (_agentLBEnabled.value() && !_agentLbHappened) {
+            SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
+            sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
+            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
+            List<HostVO> allManagedRoutingAgents = sc.list();
+
+            sc = SearchCriteria2.create(HostVO.class);
+            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
+            List<HostVO> allAgents = sc.list();
+            double allHostsCount = allAgents.size();
+            double managedHostsCount = allManagedRoutingAgents.size();
+            if (allHostsCount > 0.0) {
+                double load = managedHostsCount / allHostsCount;
+                if (load >= _connectedAgentsThreshold.value()) {
+                    s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
+                    _rebalanceService.scheduleRebalanceAgents();
+                    _agentLbHappened = true;
+                } else {
+                    s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
+                }
+            }
+        }
+        profilerAgentLB.stop();
+    }
+    
+    @Override
+    public ConfigKey<?>[] getConfigKeys() {
+        ConfigKey<?>[] keys = super.getConfigKeys();
+        
+        List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>();
+        keysLst.addAll(Arrays.asList(keys));
+        keysLst.add(EnableLB);
+        keysLst.add(ConnectedAgentThreshold);
+        keysLst.add(LoadSize);
+        keysLst.add(ScanInterval);
+        return keysLst.toArray(new ConfigKey<?>[keysLst.size()]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
new file mode 100755
index 0000000..9012433
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.transport.Request;
+import com.cloud.agent.transport.Response;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.UnsupportedVersionException;
+import com.cloud.resource.ServerResource;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable {
+    private final ClusteredAgentManagerImpl _mgr;
+    private final long _nodeId;
+
+    public ClusteredDirectAgentAttache(AgentManagerImpl agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) {
+        super(agentMgr, id, resource, maintenance, mgr);
+        _mgr = mgr;
+        _nodeId = mgmtId;
+    }
+
+    @Override
+    public void routeToAgent(byte[] data) throws AgentUnavailableException {
+        Request req;
+        try {
+            req = Request.parse(data);
+        } catch (ClassNotFoundException e) {
+            throw new CloudRuntimeException("Unable to rout to an agent ", e);
+        } catch (UnsupportedVersionException e) {
+            throw new CloudRuntimeException("Unable to rout to an agent ", e);
+        }
+
+        if (req instanceof Response) {
+            super.process(((Response) req).getAnswers());
+        } else {
+            super.send(req);
+        }
+    }
+
+    @Override
+    public boolean processAnswers(long seq, Response response) {
+        long mgmtId = response.getManagementServerId();
+        if (mgmtId != -1 && mgmtId != _nodeId) {
+            _mgr.routeToPeer(Long.toString(mgmtId), response.getBytes());
+            if (response.executeInSequence()) {
+                sendNext(response.getSequence());
+            }
+            return true;
+        } else {
+            return super.processAnswers(seq, response);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/ConnectedAgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/ConnectedAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/ConnectedAgentAttache.java
new file mode 100755
index 0000000..e5d2867
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/ConnectedAgentAttache.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.transport.Request;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.host.Status;
+import com.cloud.utils.nio.Link;
+
+/**
+ * ConnectedAgentAttache implements an direct connection to this management server.
+ */
+public class ConnectedAgentAttache extends AgentAttache {
+    private static final Logger s_logger = Logger.getLogger(ConnectedAgentAttache.class);
+
+    protected Link _link;
+
+    public ConnectedAgentAttache(AgentManagerImpl agentMgr, final long id, final Link link, boolean maintenance) {
+        super(agentMgr, id, maintenance);
+        _link = link;
+    }
+
+    @Override
+    public synchronized void send(Request req) throws AgentUnavailableException {
+        try {
+            _link.send(req.toBytes());
+        } catch (ClosedChannelException e) {
+            throw new AgentUnavailableException("Channel is closed", _id);
+        }
+    }
+
+    @Override
+    public synchronized boolean isClosed() {
+        return _link == null;
+    }
+
+    @Override
+    public void disconnect(final Status state) {
+        synchronized (this) {
+            s_logger.debug("Processing Disconnect.");
+            if (_link != null) {
+                _link.close();
+                _link.terminated();
+            }
+            _link = null;
+        }
+        cancelAllCommands(state, true);
+        _requests.clear();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        try {
+            ConnectedAgentAttache that = (ConnectedAgentAttache) obj;
+            return super.equals(obj) && _link == that._link && _link != null;
+        } catch (ClassCastException e) {
+            assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to " + this.getClass().getSimpleName() + ".equals()? ";
+        return false;
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        try {
+            assert _link == null : "Duh...Says you....Forgot to call disconnect()!";
+            synchronized (this) {
+                if (_link != null) {
+                    s_logger.warn("Lost attache " + _id);
+                    disconnect(Status.Alert);
+                }
+            }
+        } finally {
+            super.finalize();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
new file mode 100755
index 0000000..5b5d8d2
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/DirectAgentAttache.java
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.CronCommand;
+import com.cloud.agent.api.PingCommand;
+import com.cloud.agent.api.StartupAnswer;
+import com.cloud.agent.transport.Request;
+import com.cloud.agent.transport.Response;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.host.Status;
+import com.cloud.host.Status.Event;
+import com.cloud.resource.ServerResource;
+
+public class DirectAgentAttache extends AgentAttache {
+    private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class);
+
+    ServerResource _resource;
+    List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
+    AgentManagerImpl _mgr;
+    long _seq = 0;
+
+    public DirectAgentAttache(AgentManagerImpl agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
+        super(agentMgr, id, maintenance);
+        _resource = resource;
+        _mgr = mgr;
+    }
+
+    @Override
+    public void disconnect(Status state) {
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Processing disconnect " + _id);
+        }
+
+        for (ScheduledFuture<?> future : _futures) {
+            future.cancel(false);
+        }
+
+        synchronized(this) {
+            if( _resource != null ) {
+                _resource.disconnected();
+                _resource = null;
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof DirectAgentAttache)) {
+            return false;
+        }
+        return super.equals(obj);
+    }
+
+    @Override
+    public synchronized boolean isClosed() {
+        return _resource == null;
+    }
+
+    @Override
+    public void send(Request req) throws AgentUnavailableException {
+        req.logD("Executing: ", true);
+        if (req instanceof Response) {
+            Response resp = (Response)req;
+            Answer[] answers = resp.getAnswers();
+            if (answers != null && answers[0] instanceof StartupAnswer) {
+                StartupAnswer startup = (StartupAnswer)answers[0];
+                int interval = startup.getPingInterval();
+                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
+            }
+        } else {
+            Command[] cmds = req.getCommands();
+            if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
+                _agentMgr.getDirectAgentPool().execute(new Task(req));
+            } else {
+                CronCommand cmd = (CronCommand)cmds[0];
+                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
+            }
+        }
+    }
+
+    @Override
+    public void process(Answer[] answers) {
+        if (answers != null && answers[0] instanceof StartupAnswer) {
+            StartupAnswer startup = (StartupAnswer)answers[0];
+            int interval = startup.getPingInterval();
+            s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval );
+            _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        try {
+            assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?";
+            synchronized (this) {
+                if (_resource != null) {
+                    s_logger.warn("Lost attache for " + _id);
+                    disconnect(Status.Alert);
+                }
+            }
+        } finally {
+            super.finalize();
+        }
+    }
+
+    protected class PingTask implements Runnable {
+        @Override
+        public synchronized void run() {
+            try {
+                ServerResource resource = _resource;
+
+                if (resource != null) {
+                    PingCommand cmd = resource.getCurrentStatus(_id);
+                    if (cmd == null) {
+                        s_logger.warn("Unable to get current status on " + _id);
+                        _mgr.disconnectWithInvestigation(DirectAgentAttache.this, Event.AgentDisconnected);
+                        return;
+                    }
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Ping from " + _id);
+                    }
+                    long seq = _seq++;
+
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(_id, -1, cmd, false).toString());
+                    }
+
+                    _mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd});
+                } else {
+                    s_logger.debug("Unable to send ping because agent is disconnected " + _id);
+                }
+            } catch (Exception e) {
+                s_logger.warn("Unable to complete the ping task", e);
+            }
+        }
+    }
+
+
+    protected class Task implements Runnable {
+        Request _req;
+
+        public Task(Request req) {
+            _req = req;
+        }
+
+        @Override
+        public void run() {
+            long seq = _req.getSequence();
+            try {
+                ServerResource resource = _resource;
+                Command[] cmds = _req.getCommands();
+                boolean stopOnError = _req.stopOnError();
+
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug(log(seq, "Executing request"));
+                }
+                ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
+                for (int i = 0; i < cmds.length; i++) {
+                    Answer answer = null;
+                    try {
+                        if (resource != null) {
+                            answer = resource.executeRequest(cmds[i]);
+                            if(answer == null) {
+                            	s_logger.warn("Resource returned null answer!");
+                                answer = new Answer(cmds[i], false, "Resource returned null answer");
+                            }
+                        } else {
+                            answer = new Answer(cmds[i], false, "Agent is disconnected");
+                        }
+                    } catch (Exception e) {
+                        s_logger.warn(log(seq, "Exception Caught while executing command"), e);
+                        answer = new Answer(cmds[i], false, e.toString());
+                    }
+                    answers.add(answer);
+                    if (!answer.getResult() && stopOnError) {
+                        if (i < cmds.length - 1 && s_logger.isDebugEnabled()) {
+                            s_logger.debug(log(seq, "Cancelling because one of the answers is false and it is stop on error."));
+                        }
+                        break;
+                    }
+                }
+
+                Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug(log(seq, "Response Received: "));
+                }
+
+                processAnswers(seq, resp);
+            } catch (Exception e) {
+                s_logger.warn(log(seq, "Exception caught "), e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/DummyAttache.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/DummyAttache.java b/engine/orchestration/src/com/cloud/agent/manager/DummyAttache.java
new file mode 100755
index 0000000..182c1b8
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/DummyAttache.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import com.cloud.agent.transport.Request;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.host.Status;
+
+
+public class DummyAttache extends AgentAttache {
+
+	public DummyAttache(AgentManagerImpl agentMgr, long id, boolean maintenance) {
+		super(agentMgr, id, maintenance);
+	}
+
+
+	@Override
+	public void disconnect(Status state) {
+
+	}
+
+	
+	@Override
+	protected boolean isClosed() {
+		return false;
+	}
+
+	
+	@Override
+	public void send(Request req) throws AgentUnavailableException {
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/Routable.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/Routable.java b/engine/orchestration/src/com/cloud/agent/manager/Routable.java
new file mode 100644
index 0000000..c8eb185
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/Routable.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import com.cloud.exception.AgentUnavailableException;
+
+public interface Routable {
+    /**
+     * Directly rout this data to the agent.
+     * 
+     * @param data
+     * @throws AgentUnavailableException
+     */
+    void routeToAgent(byte[] data) throws AgentUnavailableException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/engine/orchestration/src/com/cloud/agent/manager/SynchronousListener.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/SynchronousListener.java b/engine/orchestration/src/com/cloud/agent/manager/SynchronousListener.java
new file mode 100755
index 0000000..3698705
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/agent/manager/SynchronousListener.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.manager;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.Listener;
+import com.cloud.agent.api.AgentControlAnswer;
+import com.cloud.agent.api.AgentControlCommand;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.StartupCommand;
+import com.cloud.host.Host;
+import com.cloud.host.Status;
+import com.cloud.utils.Profiler;
+
+public class SynchronousListener implements Listener {
+    private static final Logger s_logger = Logger.getLogger(SynchronousListener.class);
+	
+    protected Answer[] _answers;
+    protected boolean _disconnected;
+    protected String _peer;
+
+    public SynchronousListener(Listener listener) {
+        _answers = null;
+        _peer = null;
+    }
+    
+    public void setPeer(String peer) {
+        _peer = peer;
+    }
+    
+    public String getPeer() {
+        return _peer;
+    }
+    
+    public Answer[] getAnswers() {
+        return _answers;
+    }
+    
+    @Override
+    public boolean isRecurring() {
+        return false;
+    }
+    
+    public boolean isDisconnected() {
+        return _disconnected;
+    }
+
+    @Override
+    public synchronized boolean processAnswers(long agentId, long seq, Answer[] resp) {
+        _answers = resp;
+        notifyAll();
+        return true;
+    }
+    
+    @Override
+    public synchronized boolean processDisconnect(long agentId, Status state) {
+    	if(s_logger.isTraceEnabled())
+    		s_logger.trace("Agent disconnected, agent id: " + agentId + ", state: " + state + ". Will notify waiters");
+    	
+        _disconnected = true;
+        notifyAll();
+        return true;
+    }
+    
+    @Override
+    public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) {
+    }
+
+    @Override
+    public boolean processCommands(long agentId, long seq, Command[] req) {
+        return false;
+    }
+    
+    @Override
+    public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
+    	return null;
+    }
+    
+    public Answer[] waitFor() throws InterruptedException {
+        return waitFor(-1);
+    }
+    
+    public synchronized Answer[] waitFor(int s) throws InterruptedException {
+        if (_disconnected) {
+            return null;
+        }
+        
+        if (_answers != null) {
+            return _answers;
+        }
+
+        Profiler profiler = new Profiler();
+        profiler.start();
+        if (s <= 0) {
+            wait();
+        } else {
+            int ms = s * 1000;
+            wait(ms);
+        }
+        profiler.stop();
+        
+        if(s_logger.isTraceEnabled()) {
+        	s_logger.trace("Synchronized command - sending completed, time: " + profiler.getDuration() + ", answer: " +
+    			(_answers != null ? _answers[0].toString() : "null"));
+        }
+        return _answers;
+    }
+    
+    @Override
+    public boolean processTimeout(long agentId, long seq) {
+    	return true;
+    }
+    
+    @Override
+    public int getTimeout() {
+    	return -1;
+    }
+    
+}


Mime
View raw message