cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [13/36] Commit to try something on removing getZone
Date Fri, 06 Sep 2013 22:38:43 GMT
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
deleted file mode 100755
index bacd4d9..0000000
--- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ /dev/null
@@ -1,1421 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.ejb.Local;
-import javax.inject.Inject;
-import javax.naming.ConfigurationException;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.log4j.Logger;
-
-import com.google.gson.Gson;
-
-import org.apache.cloudstack.framework.config.ConfigDepot;
-import org.apache.cloudstack.framework.config.ConfigKey;
-import org.apache.cloudstack.framework.config.ConfigValue;
-import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.utils.identity.ManagementServerNode;
-
-import com.cloud.agent.AgentManager;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.CancelCommand;
-import com.cloud.agent.api.ChangeAgentAnswer;
-import com.cloud.agent.api.ChangeAgentCommand;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.PropagateResourceEventCommand;
-import com.cloud.agent.api.ScheduleHostScanTaskCommand;
-import com.cloud.agent.api.TransferAgentCommand;
-import com.cloud.agent.transport.Request;
-import com.cloud.agent.transport.Request.Version;
-import com.cloud.agent.transport.Response;
-import com.cloud.api.ApiDBUtils;
-import com.cloud.cluster.ClusterManager;
-import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ClusterServicePdu;
-import com.cloud.cluster.ClusteredAgentRebalanceService;
-import com.cloud.cluster.ManagementServerHost;
-import com.cloud.cluster.ManagementServerHostVO;
-import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner;
-import com.cloud.cluster.agentlb.HostTransferMapVO;
-import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
-import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
-import com.cloud.cluster.dao.ManagementServerHostDao;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.host.Host;
-import com.cloud.host.HostVO;
-import com.cloud.host.Status;
-import com.cloud.host.Status.Event;
-import com.cloud.resource.ServerResource;
-import com.cloud.serializer.GsonHelper;
-import com.cloud.storage.resource.DummySecondaryStorageResource;
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.Profiler;
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.SearchCriteria2;
-import com.cloud.utils.db.SearchCriteriaService;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.utils.nio.Link;
-import com.cloud.utils.nio.Task;
-
-@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class })
-public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
-    final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
-    private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor"));
-    private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
-
-    public final static long STARTUP_DELAY = 5000;
-    public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
-    public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
-    protected Set<Long> _agentToTransferIds = new HashSet<Long>();
-
-    Gson _gson;
-
-    @Inject
-    protected ClusterManager _clusterMgr = null;
-
-    protected HashMap<String, SocketChannel> _peers;
-    protected HashMap<String, SSLEngine> _sslEngines;
-    private final Timer _timer = new Timer("ClusteredAgentManager Timer");
-
-    @Inject
-    protected ManagementServerHostDao _mshostDao;
-    @Inject
-    protected HostTransferMapDao _hostTransferDao;
-
-    // @com.cloud.utils.component.Inject(adapter = AgentLoadBalancerPlanner.class)
-    @Inject protected List<AgentLoadBalancerPlanner> _lbPlanners;
-
-    @Inject ConfigurationDao _configDao;
-    @Inject
-    ConfigDepot _configDepot;
-
-    protected ClusteredAgentManagerImpl() {
-        super();
-    }
-
-    protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false",
-            "Enable agent load balancing between management server nodes", true);
-    protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7",
-            "What percentage of the agents can be held by one management server before load balancing happens", true);
-    protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16",
-            "How many agents to connect to in each round", true);
-    protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90",
-            "Interval between scans to load agents", false);
-    
-
-    protected ConfigValue<Boolean> _agentLBEnabled;
-    protected ConfigValue<Double> _connectedAgentsThreshold;
-    protected ConfigValue<Integer> _loadSize;
-    protected ConfigValue<Integer> _directAgentScanInterval;
-
-    @Override
-    public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
-        _peers = new HashMap<String, SocketChannel>(7);
-        _sslEngines = new HashMap<String, SSLEngine>(7);
-        _nodeId = ManagementServerNode.getManagementServerId();
-
-        s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId);
-
-        _loadSize = _configDepot.get(LoadSize);
-        _directAgentScanInterval = _configDepot.get(ScanInterval).setMultiplier(1000);
-        _agentLBEnabled = _configDepot.get(EnableLB);
-        _connectedAgentsThreshold = _configDepot.get(ConnectedAgentThreshold);
-
-        ClusteredAgentAttache.initialize(this);
-
-        _clusterMgr.registerListener(this);
-        _clusterMgr.registerDispatcher(new ClusterDispatcher());
-        
-        _gson = GsonHelper.getGson();
-
-        return super.configure(name, xmlParams);
-    }
-
-    @Override
-    public boolean start() {
-        if (!super.start()) {
-            return false;
-        }
-        _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval.value());
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval.value() + " seconds");
-        }
-
-        // schedule transfer scan executor - if agent LB is enabled
-        if (isAgentRebalanceEnabled()) {
-            s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
-                    TimeUnit.MILLISECONDS);
-        }
-
-        return true;
-    }
-
-    public void scheduleHostScanTask() {
-        _timer.schedule(new DirectAgentScanTimerTask(), 0);
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Scheduled a direct agent scan task");
-        }
-    }
-
-    private void runDirectAgentScanTimerTask() {
-        scanDirectAgentToLoad();
-    }
-
-    private void scanDirectAgentToLoad() {
-        if (s_logger.isTraceEnabled()) {
-            s_logger.trace("Begin scanning directly connected hosts");
-        }
-
-        // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
-        long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout();
-        List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize.value().longValue(), _nodeId);
-        List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId);
-        hosts.addAll(appliances);
-
-        if (hosts != null && hosts.size() > 0) {
-            s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them...");
-            for (HostVO host : hosts) {
-                try {
-                    AgentAttache agentattache = findAttache(host.getId());
-                    if (agentattache != null) {
-                        // already loaded, skip
-                        if (agentattache.forForward()) {
-                            if (s_logger.isInfoEnabled()) {
-                                s_logger.info(host + " is detected down, but we have a forward attache running, disconnect this one before launching the host");
-                            }
-                            removeAgent(agentattache, Status.Disconnected);
-                        } else {
-                            continue;
-                        }
-                    }
-
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")");
-                    }
-                    loadDirectlyConnectedHost(host, false);
-                } catch (Throwable e) {
-                    s_logger.warn(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to ",e);
-                }
-            }
-        }
-
-        if (s_logger.isTraceEnabled()) {
-            s_logger.trace("End scanning directly connected hosts");
-        }
-    }
-
-    private class DirectAgentScanTimerTask extends TimerTask {
-        @Override
-        public void run() {
-            try {
-                runDirectAgentScanTimerTask();
-            } catch (Throwable e) {
-                s_logger.error("Unexpected exception " + e.getMessage(), e);
-            }
-        }
-    }
-
-    @Override
-    public Task create(Task.Type type, Link link, byte[] data) {
-        return new ClusteredAgentHandler(type, link, data);
-    }
-
-    protected AgentAttache createAttache(long id) {
-        s_logger.debug("create forwarding ClusteredAgentAttache for " + id);
-        final AgentAttache attache = new ClusteredAgentAttache(this, id);
-        AgentAttache old = null;
-        synchronized (_agents) {
-            old = _agents.get(id);
-            _agents.put(id, attache);
-        }
-        if (old != null) {
-            old.disconnect(Status.Removed);
-        }
-        return attache;
-    }
-
-    @Override
-    protected AgentAttache createAttacheForConnect(HostVO host, Link link) {
-        s_logger.debug("create ClusteredAgentAttache for " + host.getId());
-        final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), link, host.isInMaintenanceStates());
-        link.attach(attache);
-        AgentAttache old = null;
-        synchronized (_agents) {
-            old = _agents.get(host.getId());
-            _agents.put(host.getId(), attache);
-        }
-        if (old != null) {
-            old.disconnect(Status.Removed);
-        }
-        return attache;
-    }
-
-    @Override
-    protected AgentAttache createAttacheForDirectConnect(HostVO host, ServerResource resource) {
-        if (resource instanceof DummySecondaryStorageResource) {
-            return new DummyAttache(this, host.getId(), false);
-        }
-        s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId());
-        final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), _nodeId, resource, host.isInMaintenanceStates(), this);
-        AgentAttache old = null;
-        synchronized (_agents) {
-            old = _agents.get(host.getId());
-            _agents.put(host.getId(), attache);
-        }
-        if (old != null) {
-            old.disconnect(Status.Removed);
-        }
-        return attache;
-    }
-
-    @Override
-    protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event, boolean transitState, boolean removeAgent) {
-        return handleDisconnect(attache, event, false, true, removeAgent);
-    }
-
-    @Override
-    protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) {
-        return handleDisconnect(attache, event, true, true, true);
-    }
-
-    protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast, boolean removeAgent) {
-        boolean res;
-        if (!investigate) {
-            res = super.handleDisconnectWithoutInvestigation(agent, event, true, removeAgent);
-        } else {
-            res = super.handleDisconnectWithInvestigation(agent, event);
-        }
-
-		if (res) {
-			if (broadcast) {
-				notifyNodesInCluster(agent);
-			}
-			return true;
-		} else {
-			return false;
-		}
-    }
-
-    @Override
-    public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException {
-        if (event == Event.AgentDisconnected) {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Received agent disconnect event for host " + hostId);
-            }
-            AgentAttache attache = findAttache(hostId);
-            if (attache != null) {
-                //don't process disconnect if the host is being rebalanced
-                if (isAgentRebalanceEnabled()) {
-                    HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
-                    if (transferVO != null) {
-                        if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
-                            s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id="
-                                    + hostId +" as the host is being connected to " + _nodeId);
-                            return true;
-                        }
-                    }
-                }
-
-                //don't process disconnect if the disconnect came for the host via delayed cluster notification,
-                //but the host has already reconnected to the current management server
-                if (!attache.forForward()) {
-                    s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id="
-                            + hostId +" as the host is directly connected to the current management server " + _nodeId);
-                    return true;
-                }
-
-                return super.handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, false, true);
-            }
-
-            return true;
-        } else {
-            return super.executeUserRequest(hostId, event);
-        }
-    }
-
-    @Override
-    public boolean reconnect(final long hostId) {
-        Boolean result;
-        try {
-            result = propagateAgentEvent(hostId, Event.ShutdownRequested);
-	        if (result != null) {
-	            return result;
-	        }
-        } catch (AgentUnavailableException e) {
-	        s_logger.debug("cannot propagate agent reconnect because agent is not available", e);
-	        return false;
-        }
-
-        return super.reconnect(hostId);
-    }
-
-    public void notifyNodesInCluster(AgentAttache attache) {
-        s_logger.debug("Notifying other nodes of to disconnect");
-        Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) };
-        _clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
-    }
-
-    // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
-    public void notifyNodesInClusterToScheduleHostScanTask() {
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Notifying other MS nodes to run host scan task");
-        }
-        Command[] cmds = new Command[] { new ScheduleHostScanTaskCommand() };
-        _clusterMgr.broadcast(0, _gson.toJson(cmds));
-    }
-
-    protected static void logT(byte[] bytes, final String msg) {
-        s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
-                + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
-    }
-
-    protected static void logD(byte[] bytes, final String msg) {
-        s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
-                + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
-    }
-
-    protected static void logI(byte[] bytes, final String msg) {
-        s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
-                + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
-    }
-
-    public boolean routeToPeer(String peer, byte[] bytes) {
-        int i = 0;
-        SocketChannel ch = null;
-        SSLEngine sslEngine = null;
-        while (i++ < 5) {
-            ch = connectToPeer(peer, ch);
-            if (ch == null) {
-                try {
-                    logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString());
-                } catch (Exception e) {
-                }
-                return false;
-            }
-            sslEngine = getSSLEngine(peer);
-            if (sslEngine == null) {
-                logD(bytes, "Unable to get SSLEngine of peer: " + peer);
-                return false;
-            }
-            try {
-                if (s_logger.isDebugEnabled()) {
-                    logD(bytes, "Routing to peer");
-                }
-                Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) }, sslEngine);
-                return true;
-            } catch (IOException e) {
-                try {
-                    logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage());
-                } catch (Exception ex) {
-                }
-            }
-        }
-        return false;
-    }
-
-    public String findPeer(long hostId) {
-        return getPeerName(hostId);
-    }
-
-    public SSLEngine getSSLEngine(String peerName) {
-        return _sslEngines.get(peerName);
-    }
-
-    public void cancel(String peerName, long hostId, long sequence, String reason) {
-        CancelCommand cancel = new CancelCommand(sequence, reason);
-        Request req = new Request(hostId, _nodeId, cancel, true);
-        req.setControl(true);
-        routeToPeer(peerName, req.getBytes());
-    }
-
-    public void closePeer(String peerName) {
-        synchronized (_peers) {
-            SocketChannel ch = _peers.get(peerName);
-            if (ch != null) {
-                try {
-                    ch.close();
-                } catch (IOException e) {
-                    s_logger.warn("Unable to close peer socket connection to " + peerName);
-                }
-            }
-            _peers.remove(peerName);
-            _sslEngines.remove(peerName);
-        }
-    }
-
-    public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) {
-        synchronized (_peers) {
-            SocketChannel ch = _peers.get(peerName);
-            SSLEngine sslEngine = null;
-            if (prevCh != null) {
-                try {
-                    prevCh.close();
-                } catch (Exception e) {
-                }
-            }
-            if (ch == null || ch == prevCh) {
-                ManagementServerHost ms = _clusterMgr.getPeer(peerName);
-                if (ms == null) {
-                    s_logger.info("Unable to find peer: " + peerName);
-                    return null;
-                }
-                String ip = ms.getServiceIP();
-                InetAddress addr;
-                try {
-                    addr = InetAddress.getByName(ip);
-                } catch (UnknownHostException e) {
-                    throw new CloudRuntimeException("Unable to resolve " + ip);
-                }
-                try {
-                    ch = SocketChannel.open(new InetSocketAddress(addr, _port.value()));
-                    ch.configureBlocking(true); // make sure we are working at blocking mode
-                    ch.socket().setKeepAlive(true);
-                    ch.socket().setSoTimeout(60 * 1000);
-                    try {
-                        SSLContext sslContext = Link.initSSLContext(true);
-                        sslEngine = sslContext.createSSLEngine(ip, _port.value());
-                        sslEngine.setUseClientMode(true);
-
-                        Link.doHandshake(ch, sslEngine, true);
-                        s_logger.info("SSL: Handshake done");
-                    } catch (Exception e) {
-                        throw new IOException("SSL: Fail to init SSL! " + e);
-                    }
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip);
-                    }
-                    _peers.put(peerName, ch);
-                    _sslEngines.put(peerName, sslEngine);
-                } catch (IOException e) {
-                    s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e);
-                    return null;
-                }
-            }
-
-            if (s_logger.isTraceEnabled()) {
-                s_logger.trace("Found open channel for peer: " + peerName);
-            }
-            return ch;
-        }
-    }
-
-    public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) {
-        String peerName = getPeerName(hostId);
-        if (peerName == null) {
-            return null;
-        }
-
-        return connectToPeer(peerName, prevCh);
-    }
-
-    @Override
-    protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException {
-        assert (hostId != null) : "Who didn't check their id value?";
-        HostVO host = _hostDao.findById(hostId);
-        if (host == null) {
-            throw new AgentUnavailableException("Can't find the host ", hostId);
-        }
-
-        AgentAttache agent = findAttache(hostId);
-        if (agent == null) {
-            if (host.getStatus() == Status.Up && (host.getManagementServerId() != null && host.getManagementServerId() != _nodeId)) {
-                agent = createAttache(hostId);
-            }
-        }
-        if (agent == null) {
-        	AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId);
-            ex.addProxyObject(ApiDBUtils.findHostById(hostId).getUuid());
-            throw ex;
-        }
-
-        return agent;
-    }
-
-    @Override
-    public boolean stop() {
-        if (_peers != null) {
-            for (SocketChannel ch : _peers.values()) {
-                try {
-                    s_logger.info("Closing: " + ch.toString());
-                    ch.close();
-                } catch (IOException e) {
-                }
-            }
-        }
-        _timer.cancel();
-
-        //cancel all transfer tasks
-        s_transferExecutor.shutdownNow();
-        cleanupTransferMap(_nodeId);
-
-        return super.stop();
-    }
-
-    @Override
-    public void startDirectlyConnectedHosts() {
-        // override and let it be dummy for purpose, we will scan and load direct agents periodically.
-        // We may also pickup agents that have been left over from other crashed management server
-    }
-
-    public class ClusteredAgentHandler extends AgentHandler {
-
-        public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) {
-            super(type, link, data);
-        }
-
-        @Override
-        protected void doTask(final Task task) throws Exception {
-            Transaction txn = Transaction.open(Transaction.CLOUD_DB);
-            try {
-                if (task.getType() != Task.Type.DATA) {
-                    super.doTask(task);
-                    return;
-                }
-
-                final byte[] data = task.getData();
-                Version ver = Request.getVersion(data);
-                if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) {
-                    s_logger.warn("Wrong version for clustered agent request");
-                    super.doTask(task);
-                    return;
-                }
-
-                long hostId = Request.getAgentId(data);
-                Link link = task.getLink();
-
-                if (Request.fromServer(data)) {
-
-                    AgentAttache agent = findAttache(hostId);
-
-                    if (Request.isControl(data)) {
-                        if (agent == null) {
-                            logD(data, "No attache to process cancellation");
-                            return;
-                        }
-                        Request req = Request.parse(data);
-                        Command[] cmds = req.getCommands();
-                        CancelCommand cancel = (CancelCommand) cmds[0];
-                        if (s_logger.isDebugEnabled()) {
-                            logD(data, "Cancel request received");
-                        }
-                        agent.cancel(cancel.getSequence());
-                        final Long current = agent._currentSequence;
-                        //if the request is the current request, always have to trigger sending next request in sequence,
-                        //otherwise the agent queue will be blocked
-                        if (req.executeInSequence() && (current != null && current == Request.getSequence(data))) {
-                            agent.sendNext(Request.getSequence(data));
-                        }
-                        return;
-                    }
-
-                    try {
-                        if (agent == null || agent.isClosed()) {
-                            throw new AgentUnavailableException("Unable to route to agent ", hostId);
-                        }
-
-                        if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) {
-                            // route it to the agent.
-                            // But we have the serialize the control commands here so we have
-                            // to deserialize this and send it through the agent attache.
-                            Request req = Request.parse(data);
-                            agent.send(req, null);
-                            return;
-                        } else {
-                            if (agent instanceof Routable) {
-                                Routable cluster = (Routable) agent;
-                                cluster.routeToAgent(data);
-                            } else {
-                                agent.send(Request.parse(data));
-                            }
-                            return;
-                        }
-                    } catch (AgentUnavailableException e) {
-                        logD(data, e.getMessage());
-                        cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage());
-                    }
-                } else {
-
-                    long mgmtId = Request.getManagementServerId(data);
-                    if (mgmtId != -1 && mgmtId != _nodeId) {
-                        routeToPeer(Long.toString(mgmtId), data);
-                        if (Request.requiresSequentialExecution(data)) {
-                            AgentAttache attache = (AgentAttache) link.attachment();
-                            if (attache != null) {
-                                attache.sendNext(Request.getSequence(data));
-                            } else if (s_logger.isDebugEnabled()) {
-                                logD(data, "No attache to process " + Request.parse(data).toString());
-                            }
-                        }
-                        return;
-                    } else {
-                        if (Request.isRequest(data)) {
-                            super.doTask(task);
-                        } else {
-                            // received an answer.
-                            final Response response = Response.parse(data);
-                            AgentAttache attache = findAttache(response.getAgentId());
-                            if (attache == null) {
-                                s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString());
-                                return;
-                            }
-                            if (!attache.processAnswers(response.getSequence(), response)) {
-                                s_logger.info("SeqA " + attache.getId() + "-" + response.getSequence() + ": Response is not processed: " + response.toString());
-                            }
-                        }
-                        return;
-                    }
-                }
-            } finally {
-                txn.close();
-            }
-        }
-    }
-
-    @Override
-    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
-    }
-
-    @Override
-    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
-        for (ManagementServerHost vo : nodeList) {
-            s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid());
-            long lastPing = (System.currentTimeMillis() >> 10) - getTimeout();
-            _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing);
-            s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid());
-            cleanupTransferMap(vo.getMsid());
-        }
-    }
-
-    @Override
-    public void onManagementNodeIsolated() {
-    }
-
-    @Override
-    public void removeAgent(AgentAttache attache, Status nextState) {
-        if (attache == null) {
-            return;
-        }
-
-        super.removeAgent(attache, nextState);
-    }
-
-    @Override
-    public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException {
-    	boolean result = false;
-        if (event == Event.RequestAgentRebalance) {
-            return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
-        } else if (event == Event.StartAgentRebalance) {
-            try {
-            	result = rebalanceHost(agentId, currentOwnerId, futureOwnerId);
-            } catch (Exception e) {
-                s_logger.warn("Unable to rebalance host id=" + agentId, e);
-            }
-        }
-        return result;
-    }
-
-    @Override
-    public void scheduleRebalanceAgents() {
-        _timer.schedule(new AgentLoadBalancerTask(), 30000);
-    }
-
-    public class AgentLoadBalancerTask extends TimerTask {
-        protected volatile boolean cancelled = false;
-
-        public AgentLoadBalancerTask() {
-            s_logger.debug("Agent load balancer task created");
-        }
-
-        @Override
-        public synchronized boolean cancel() {
-            if (!cancelled) {
-                cancelled = true;
-                s_logger.debug("Agent load balancer task cancelled");
-                return super.cancel();
-            }
-            return true;
-        }
-
-        @Override
-        public synchronized void run() {
-        	try {
-	            if (!cancelled) {
-	                startRebalanceAgents();
-	                if (s_logger.isInfoEnabled()) {
-	                    s_logger.info("The agent load balancer task is now being cancelled");
-	                }
-	                cancelled = true;
-	            }
-        	} catch(Throwable e) {
-        		s_logger.error("Unexpected exception " + e.toString(), e);
-        	}
-        }
-    }
-
-    public void startRebalanceAgents() {
-        s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents");
-        List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up);
-        SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
-        sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
-        sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
-        List<HostVO> allManagedAgents = sc.list();
-
-        int avLoad = 0;
-
-        if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) {
-            avLoad = allManagedAgents.size() / allMS.size();
-        } else {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() + "; number of managed agents is " + allManagedAgents.size());
-            }
-            return;
-        }
-
-        if (avLoad == 0L) {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("As calculated average load is less than 1, rounding it to 1");
-            }
-            avLoad = 1;
-        }
-
-        for (ManagementServerHostVO node : allMS) {
-            if (node.getMsid() != _nodeId) {
-
-                List<HostVO> hostsToRebalance = new ArrayList<HostVO>();
-                for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
-                    hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad);
-                    if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
-                        break;
-                    } else {
-                        s_logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + node.getMsid());
-                    }
-                }
-
-
-                if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
-                    s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid());
-                    for (HostVO host : hostsToRebalance) {
-                        long hostId = host.getId();
-                        s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
-                        boolean result = true;
-
-                        if (_hostTransferDao.findById(hostId) != null) {
-                            s_logger.warn("Somebody else is already rebalancing host id: " + hostId);
-                            continue;
-                        }
-
-                        HostTransferMapVO transfer = null;
-                        try {
-                            transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
-                            Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance);
-                            if (answer == null) {
-                                s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
-                                result = false;
-                            }
-                        } catch (Exception ex) {
-                            s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex);
-                            result = false;
-                        } finally {
-                            if (transfer != null) {
-                                HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId);
-                                if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) {
-                                    if (s_logger.isDebugEnabled()) {
-                                        s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
-                                    }
-                                    //just remove the mapping (if exists) as nothing was done on the peer management server yet
-                                    _hostTransferDao.remove(transfer.getId());
-                                }
-                            }
-                        }
-                    }
-                } else {
-                    s_logger.debug("Found no hosts to rebalance from the management server " + node.getMsid());
-                }
-            }
-        }
-    }
-
-    private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) {
-        TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
-        Commands commands = new Commands(Command.OnError.Stop);
-        commands.addCommand(transfer);
-
-        Command[] cmds = commands.toCommands();
-
-        try {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
-            }
-            String peerName = Long.toString(peer);
-            String cmdStr = _gson.toJson(cmds);
-            String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true);
-            Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
-            return answers;
-        } catch (Exception e) {
-            s_logger.warn("Caught exception while talking to " + currentOwnerId, e);
-            return null;
-        }
-    }
-
-    public String getPeerName(long agentHostId) {
-
-        HostVO host = _hostDao.findById(agentHostId);
-        if (host != null && host.getManagementServerId() != null) {
-            if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
-                return null;
-            }
-
-            return Long.toString(host.getManagementServerId());
-        }
-        return null;
-    }
-
-
-    public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
-        final String msPeer = getPeerName(agentId);
-        if (msPeer == null) {
-            return null;
-        }
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
-        }
-        Command[] cmds = new Command[1];
-        cmds[0] = new ChangeAgentCommand(agentId, event);
-
-        String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true);
-        if (ansStr == null) {
-            throw new AgentUnavailableException(agentId);
-        }
-        
-        Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Result for agent change is " + answers[0].getResult());
-        }
-
-        return answers[0].getResult();
-    }
-
-    private Runnable getTransferScanTask() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (s_logger.isTraceEnabled()) {
-                        s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId);
-                    }
-                    synchronized (_agentToTransferIds) {
-                        if (_agentToTransferIds.size() > 0) {
-                            s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
-                            //for (Long hostId : _agentToTransferIds) {
-                            for (Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
-                                Long hostId = iterator.next();
-                                AgentAttache attache = findAttache(hostId);
-
-                                // if the thread:
-                                // 1) timed out waiting for the host to reconnect
-                                // 2) recipient management server is not active any more
-                                // 3) if the management server doesn't own the host any more
-                                // remove the host from re-balance list and delete from op_host_transfer DB
-                                // no need to do anything with the real attache as we haven't modified it yet
-                                Date cutTime = DateUtil.currentGMTTime();
-                                HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
-
-                                if (transferMap == null) {
-                                    s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
-                                    iterator.remove();
-                                    _hostTransferDao.completeAgentTransfer(hostId);
-                                    continue;
-                                }
-
-                                if (transferMap.getInitialOwner() != _nodeId || attache == null || attache.forForward()) {
-                                    s_logger.debug("Management server " + _nodeId + " doesn't own host id=" + hostId + " any more, skipping rebalance for the host");
-                                    iterator.remove();
-                                    _hostTransferDao.completeAgentTransfer(hostId);
-                                    continue;
-                                }
-
-                                ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
-                                if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
-                                    s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
-                                    iterator.remove();
-                                    _hostTransferDao.completeAgentTransfer(hostId);
-                                    continue;
-                                }
-
-                                if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
-                                    iterator.remove();
-                                    try {
-                                        _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner()));
-                                    } catch (RejectedExecutionException ex) {
-                                        s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution");
-                                        continue;
-                                    }
-
-                                } else {
-                                    s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
-                                }
-                            }
-                        } else {
-                            if (s_logger.isTraceEnabled()) {
-                                s_logger.trace("Found no agents to be transfered by the management server " + _nodeId);
-                            }
-                        }
-                    }
-
-                } catch (Throwable e) {
-                    s_logger.error("Problem with the clustered agent transfer scan check!", e);
-                }
-            }
-        };
-    }
-
-
-    private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) {
-        s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer");
-        synchronized (_agentToTransferIds) {
-            return  _agentToTransferIds.add(hostId);
-        }
-    }
-
-
-    protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{
-
-        boolean result = true;
-        if (currentOwnerId == _nodeId) {
-            if (!startRebalance(hostId)) {
-                s_logger.debug("Failed to start agent rebalancing");
-                finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
-                return false;
-            }
-            try {
-                Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance);
-                if (answer == null || !answer[0].getResult()) {
-                    result = false;
-                }
-
-            } catch (Exception ex) {
-                s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex);
-                result = false;
-            }
-
-            if (result) {
-                s_logger.debug("Successfully transfered host id=" + hostId + " to management server " + futureOwnerId);
-                finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted);
-            } else {
-                s_logger.warn("Failed to transfer host id=" + hostId + " to management server " + futureOwnerId);
-                finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
-            }
-
-        } else if (futureOwnerId == _nodeId) {
-            HostVO host = _hostDao.findById(hostId);
-            try {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Disconnecting host " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification");
-                }
-
-                AgentAttache attache = findAttache(hostId);
-                if (attache != null) {
-                    result = handleDisconnect(attache, Event.AgentDisconnected, false, false, true);
-                }
-
-                if (result) {
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
-                    }
-                    result = loadDirectlyConnectedHost(host, true);
-                } else {
-                    s_logger.warn("Failed to disconnect " + host.getId() + "(" + host.getName() +
-                            " as a part of rebalance process without notification");
-                }
-
-            } catch (Exception ex) {
-                s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex);
-                result = false;
-            }
-
-            if (result) {
-                s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
-            } else {
-                s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process");
-            }
-        }
-
-        return result;
-    }
-
-
-    protected void finishRebalance(final long hostId, long futureOwnerId, Event event){
-
-        boolean success = (event == Event.RebalanceCompleted) ? true : false;
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event);
-        }
-
-        AgentAttache attache = findAttache(hostId);
-        if (attache == null || !(attache instanceof ClusteredAgentAttache)) {
-            s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already");
-            _hostTransferDao.completeAgentTransfer(hostId);
-            return;
-        }
-
-        ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
-
-        if (success) {
-
-            //1) Set transfer mode to false - so the agent can start processing requests normally
-            forwardAttache.setTransferMode(false);
-
-            //2) Get all transfer requests and route them to peer
-            Request requestToTransfer = forwardAttache.getRequestToTransfer();
-            while (requestToTransfer != null) {
-                s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
-                boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
-                if (!routeResult) {
-                    logD(requestToTransfer.getBytes(), "Failed to route request to peer");
-                }
-
-                requestToTransfer = forwardAttache.getRequestToTransfer();
-            }
-
-            s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance to " + futureOwnerId);
-
-        } else {
-            failRebalance(hostId);
-        }
-
-        s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance");
-        _hostTransferDao.completeAgentTransfer(hostId);
-    }
-
-    protected void failRebalance(final long hostId){
-        try {
-            s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId);
-            _hostTransferDao.completeAgentTransfer(hostId);
-            handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed, true, true);
-        } catch (Exception ex) {
-            s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup");
-        }
-    }
-
-    protected boolean startRebalance(final long hostId) {
-        HostVO host = _hostDao.findById(hostId);
-
-        if (host == null || host.getRemoved() != null) {
-            s_logger.warn("Unable to find host record, fail start rebalancing process");
-            return false;
-        }
-
-        synchronized (_agents) {
-            ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
-            if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
-            	handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
-                ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
-                if (forwardAttache == null) {
-                    s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process");
-                    return false;
-                }
-                s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
-                forwardAttache.setTransferMode(true);
-                _agents.put(hostId, forwardAttache);
-            } else {
-                if (attache == null) {
-                    s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
-                } else {
-                    s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " + attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
-                }
-                return false;
-            }
-        }
-        _hostTransferDao.startAgentTransfer(hostId);
-        return true;
-    }
-
-    protected void cleanupTransferMap(long msId) {
-        List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId);
-
-        for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) {
-            _hostTransferDao.remove(hostJoingingCluster.getId());
-        }
-
-        List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId);
-        for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) {
-            _hostTransferDao.remove(hostLeavingCluster.getId());
-        }
-    }
-
-
-    protected class RebalanceTask implements Runnable {
-        Long hostId = null;
-        Long currentOwnerId = null;
-        Long futureOwnerId = null;
-
-
-        public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) {
-            this.hostId = hostId;
-            this.currentOwnerId = currentOwnerId;
-            this.futureOwnerId = futureOwnerId;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Rebalancing host id=" + hostId);
-                }
-                rebalanceHost(hostId, currentOwnerId, futureOwnerId);
-            } catch (Exception e) {
-                s_logger.warn("Unable to rebalance host id=" + hostId, e);
-            }
-        }
-    }
-
-    private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
-        }
-
-        try {
-            scheduleHostScanTask();
-        } catch (Exception e) {
-            // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
-            // happens at fixed intervals anyways. So handling any exceptions that may be thrown
-            s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName()
-                    + ", ignoring as regular host scan happens at fixed interval anyways", e);
-            return null;
-        }
-
-        Answer[] answers = new Answer[1];
-        answers[0] = new Answer(cmd, true, null);
-        return _gson.toJson(answers);
-    }
-
-    public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
-        Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue);
-        for (Command cmd : cmds) {
-            commands.addCommand(cmd);
-        }
-        return send(hostId, commands);
-    }
-
-
-    protected class ClusterDispatcher implements ClusterManager.Dispatcher {
-        @Override
-        public String getName() {
-            return "ClusterDispatcher";
-        }
-
-        @Override
-        public String dispatch(ClusterServicePdu pdu) {
-
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
-            }
-
-            Command[] cmds = null;
-            try {
-                cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
-            } catch (Throwable e) {
-                assert (false);
-                s_logger.error("Excection in gson decoding : ", e);
-            }
-
-            if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) {  //intercepted
-                ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
-
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
-                }
-                boolean result = false;
-                try {
-                    result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Result is " + result);
-                    }
-
-                } catch (AgentUnavailableException e) {
-                    s_logger.warn("Agent is unavailable", e);
-                    return null;
-                }
-
-                Answer[] answers = new Answer[1];
-                answers[0] = new ChangeAgentAnswer(cmd, result);
-                return _gson.toJson(answers);
-            } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
-                TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
-
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
-                }
-                boolean result = false;
-                try {
-                    result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Result is " + result);
-                    }
-
-                } catch (AgentUnavailableException e) {
-                    s_logger.warn("Agent is unavailable", e);
-                    return null;
-                } catch (OperationTimedoutException e) {
-                    s_logger.warn("Operation timed out", e);
-                    return null;
-                }
-                Answer[] answers = new Answer[1];
-                answers[0] = new Answer(cmd, result, null);
-                return _gson.toJson(answers);
-            } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
-                PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
-
-                s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
-
-                boolean result = false;
-                try {
-                    result = _resourceMgr.executeUserRequest(cmd.getHostId(), cmd.getEvent());
-                    s_logger.debug("Result is " + result);
-                } catch (AgentUnavailableException ex) {
-                    s_logger.warn("Agent is unavailable", ex);
-                    return null;
-                }
-
-                Answer[] answers = new Answer[1];
-                answers[0] = new Answer(cmd, result, null);
-                return _gson.toJson(answers);
-            } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
-                ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
-                String response = handleScheduleHostScanTaskCommand(cmd);
-                return response;
-            }
-
-            try {
-                long startTick = System.currentTimeMillis();
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
-                }
-
-                Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
-                if (answers != null) {
-                    String jsonReturn = _gson.toJson(answers);
-
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
-                                " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
-                    }
-
-                    return jsonReturn;
-                } else {
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
-                                " in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
-                    }
-                }
-            } catch (AgentUnavailableException e) {
-                s_logger.warn("Agent is unavailable", e);
-            } catch (OperationTimedoutException e) {
-                s_logger.warn("Timed Out", e);
-            }
-
-            return null;
-        }
-
-    }
-
-    public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
-        return executeUserRequest(agentId, event);
-    }
-
-    public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
-        return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
-    }
-
-    public boolean isAgentRebalanceEnabled() {
-        return _agentLBEnabled.value();
-    }
-
-    private ClusteredAgentRebalanceService _rebalanceService;
-
-    boolean _agentLbHappened = false;
-    public void agentrebalance() {
-        Profiler profilerAgentLB = new Profiler();
-        profilerAgentLB.start();
-        //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
-        if (_agentLBEnabled.value() && !_agentLbHappened) {
-            SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
-            sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
-            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
-            List<HostVO> allManagedRoutingAgents = sc.list();
-
-            sc = SearchCriteria2.create(HostVO.class);
-            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
-            List<HostVO> allAgents = sc.list();
-            double allHostsCount = allAgents.size();
-            double managedHostsCount = allManagedRoutingAgents.size();
-            if (allHostsCount > 0.0) {
-                double load = managedHostsCount / allHostsCount;
-                if (load >= _connectedAgentsThreshold.value()) {
-                    s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
-                    _rebalanceService.scheduleRebalanceAgents();
-                    _agentLbHappened = true;
-                } else {
-                    s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
-                }
-            }
-        }
-        profilerAgentLB.stop();
-    }
-    
-    @Override
-    public ConfigKey<?>[] getConfigKeys() {
-        ConfigKey<?>[] keys = super.getConfigKeys();
-        
-        List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>();
-        keysLst.addAll(Arrays.asList(keys));
-        keysLst.add(EnableLB);
-        keysLst.add(ConnectedAgentThreshold);
-        keysLst.add(LoadSize);
-        keysLst.add(ScanInterval);
-        return keysLst.toArray(new ConfigKey<?>[keysLst.size()]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
deleted file mode 100755
index 9012433..0000000
--- a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import com.cloud.agent.AgentManager;
-import com.cloud.agent.transport.Request;
-import com.cloud.agent.transport.Response;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.UnsupportedVersionException;
-import com.cloud.resource.ServerResource;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable {
-    private final ClusteredAgentManagerImpl _mgr;
-    private final long _nodeId;
-
-    public ClusteredDirectAgentAttache(AgentManagerImpl agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) {
-        super(agentMgr, id, resource, maintenance, mgr);
-        _mgr = mgr;
-        _nodeId = mgmtId;
-    }
-
-    @Override
-    public void routeToAgent(byte[] data) throws AgentUnavailableException {
-        Request req;
-        try {
-            req = Request.parse(data);
-        } catch (ClassNotFoundException e) {
-            throw new CloudRuntimeException("Unable to rout to an agent ", e);
-        } catch (UnsupportedVersionException e) {
-            throw new CloudRuntimeException("Unable to rout to an agent ", e);
-        }
-
-        if (req instanceof Response) {
-            super.process(((Response) req).getAnswers());
-        } else {
-            super.send(req);
-        }
-    }
-
-    @Override
-    public boolean processAnswers(long seq, Response response) {
-        long mgmtId = response.getManagementServerId();
-        if (mgmtId != -1 && mgmtId != _nodeId) {
-            _mgr.routeToPeer(Long.toString(mgmtId), response.getBytes());
-            if (response.executeInSequence()) {
-                sendNext(response.getSequence());
-            }
-            return true;
-        } else {
-            return super.processAnswers(seq, response);
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java b/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java
deleted file mode 100755
index e5d2867..0000000
--- a/server/src/com/cloud/agent/manager/ConnectedAgentAttache.java
+++ /dev/null
@@ -1,95 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import java.nio.channels.ClosedChannelException;
-
-import org.apache.log4j.Logger;
-
-import com.cloud.agent.transport.Request;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.host.Status;
-import com.cloud.utils.nio.Link;
-
-/**
- * ConnectedAgentAttache implements an direct connection to this management server.
- */
-public class ConnectedAgentAttache extends AgentAttache {
-    private static final Logger s_logger = Logger.getLogger(ConnectedAgentAttache.class);
-
-    protected Link _link;
-
-    public ConnectedAgentAttache(AgentManagerImpl agentMgr, final long id, final Link link, boolean maintenance) {
-        super(agentMgr, id, maintenance);
-        _link = link;
-    }
-
-    @Override
-    public synchronized void send(Request req) throws AgentUnavailableException {
-        try {
-            _link.send(req.toBytes());
-        } catch (ClosedChannelException e) {
-            throw new AgentUnavailableException("Channel is closed", _id);
-        }
-    }
-
-    @Override
-    public synchronized boolean isClosed() {
-        return _link == null;
-    }
-
-    @Override
-    public void disconnect(final Status state) {
-        synchronized (this) {
-            s_logger.debug("Processing Disconnect.");
-            if (_link != null) {
-                _link.close();
-                _link.terminated();
-            }
-            _link = null;
-        }
-        cancelAllCommands(state, true);
-        _requests.clear();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        try {
-            ConnectedAgentAttache that = (ConnectedAgentAttache) obj;
-            return super.equals(obj) && _link == that._link && _link != null;
-        } catch (ClassCastException e) {
-            assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to " + this.getClass().getSimpleName() + ".equals()? ";
-        return false;
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        try {
-            assert _link == null : "Duh...Says you....Forgot to call disconnect()!";
-            synchronized (this) {
-                if (_link != null) {
-                    s_logger.warn("Lost attache " + _id);
-                    disconnect(Status.Alert);
-                }
-            }
-        } finally {
-            super.finalize();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/DirectAgentAttache.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/DirectAgentAttache.java b/server/src/com/cloud/agent/manager/DirectAgentAttache.java
deleted file mode 100755
index 5b5d8d2..0000000
--- a/server/src/com/cloud/agent/manager/DirectAgentAttache.java
+++ /dev/null
@@ -1,219 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.CronCommand;
-import com.cloud.agent.api.PingCommand;
-import com.cloud.agent.api.StartupAnswer;
-import com.cloud.agent.transport.Request;
-import com.cloud.agent.transport.Response;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.host.Status;
-import com.cloud.host.Status.Event;
-import com.cloud.resource.ServerResource;
-
-public class DirectAgentAttache extends AgentAttache {
-    private final static Logger s_logger = Logger.getLogger(DirectAgentAttache.class);
-
-    ServerResource _resource;
-    List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
-    AgentManagerImpl _mgr;
-    long _seq = 0;
-
-    public DirectAgentAttache(AgentManagerImpl agentMgr, long id, ServerResource resource, boolean maintenance, AgentManagerImpl mgr) {
-        super(agentMgr, id, maintenance);
-        _resource = resource;
-        _mgr = mgr;
-    }
-
-    @Override
-    public void disconnect(Status state) {
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Processing disconnect " + _id);
-        }
-
-        for (ScheduledFuture<?> future : _futures) {
-            future.cancel(false);
-        }
-
-        synchronized(this) {
-            if( _resource != null ) {
-                _resource.disconnected();
-                _resource = null;
-            }
-        }
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof DirectAgentAttache)) {
-            return false;
-        }
-        return super.equals(obj);
-    }
-
-    @Override
-    public synchronized boolean isClosed() {
-        return _resource == null;
-    }
-
-    @Override
-    public void send(Request req) throws AgentUnavailableException {
-        req.logD("Executing: ", true);
-        if (req instanceof Response) {
-            Response resp = (Response)req;
-            Answer[] answers = resp.getAnswers();
-            if (answers != null && answers[0] instanceof StartupAnswer) {
-                StartupAnswer startup = (StartupAnswer)answers[0];
-                int interval = startup.getPingInterval();
-                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
-            }
-        } else {
-            Command[] cmds = req.getCommands();
-            if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
-                _agentMgr.getDirectAgentPool().execute(new Task(req));
-            } else {
-                CronCommand cmd = (CronCommand)cmds[0];
-                _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new Task(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
-            }
-        }
-    }
-
-    @Override
-    public void process(Answer[] answers) {
-        if (answers != null && answers[0] instanceof StartupAnswer) {
-            StartupAnswer startup = (StartupAnswer)answers[0];
-            int interval = startup.getPingInterval();
-            s_logger.info("StartupAnswer received " + startup.getHostId() + " Interval = " + interval );
-            _futures.add(_agentMgr.getDirectAgentPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        try {
-            assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?";
-            synchronized (this) {
-                if (_resource != null) {
-                    s_logger.warn("Lost attache for " + _id);
-                    disconnect(Status.Alert);
-                }
-            }
-        } finally {
-            super.finalize();
-        }
-    }
-
-    protected class PingTask implements Runnable {
-        @Override
-        public synchronized void run() {
-            try {
-                ServerResource resource = _resource;
-
-                if (resource != null) {
-                    PingCommand cmd = resource.getCurrentStatus(_id);
-                    if (cmd == null) {
-                        s_logger.warn("Unable to get current status on " + _id);
-                        _mgr.disconnectWithInvestigation(DirectAgentAttache.this, Event.AgentDisconnected);
-                        return;
-                    }
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Ping from " + _id);
-                    }
-                    long seq = _seq++;
-
-                    if (s_logger.isTraceEnabled()) {
-                        s_logger.trace("SeqA " + _id + "-" + seq + ": " + new Request(_id, -1, cmd, false).toString());
-                    }
-
-                    _mgr.handleCommands(DirectAgentAttache.this, seq, new Command[]{cmd});
-                } else {
-                    s_logger.debug("Unable to send ping because agent is disconnected " + _id);
-                }
-            } catch (Exception e) {
-                s_logger.warn("Unable to complete the ping task", e);
-            }
-        }
-    }
-
-
-    protected class Task implements Runnable {
-        Request _req;
-
-        public Task(Request req) {
-            _req = req;
-        }
-
-        @Override
-        public void run() {
-            long seq = _req.getSequence();
-            try {
-                ServerResource resource = _resource;
-                Command[] cmds = _req.getCommands();
-                boolean stopOnError = _req.stopOnError();
-
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug(log(seq, "Executing request"));
-                }
-                ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
-                for (int i = 0; i < cmds.length; i++) {
-                    Answer answer = null;
-                    try {
-                        if (resource != null) {
-                            answer = resource.executeRequest(cmds[i]);
-                            if(answer == null) {
-                            	s_logger.warn("Resource returned null answer!");
-                                answer = new Answer(cmds[i], false, "Resource returned null answer");
-                            }
-                        } else {
-                            answer = new Answer(cmds[i], false, "Agent is disconnected");
-                        }
-                    } catch (Exception e) {
-                        s_logger.warn(log(seq, "Exception Caught while executing command"), e);
-                        answer = new Answer(cmds[i], false, e.toString());
-                    }
-                    answers.add(answer);
-                    if (!answer.getResult() && stopOnError) {
-                        if (i < cmds.length - 1 && s_logger.isDebugEnabled()) {
-                            s_logger.debug(log(seq, "Cancelling because one of the answers is false and it is stop on error."));
-                        }
-                        break;
-                    }
-                }
-
-                Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug(log(seq, "Response Received: "));
-                }
-
-                processAnswers(seq, resp);
-            } catch (Exception e) {
-                s_logger.warn(log(seq, "Exception caught "), e);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/DummyAttache.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/DummyAttache.java b/server/src/com/cloud/agent/manager/DummyAttache.java
deleted file mode 100755
index 182c1b8..0000000
--- a/server/src/com/cloud/agent/manager/DummyAttache.java
+++ /dev/null
@@ -1,48 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import com.cloud.agent.transport.Request;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.host.Status;
-
-
-public class DummyAttache extends AgentAttache {
-
-	public DummyAttache(AgentManagerImpl agentMgr, long id, boolean maintenance) {
-		super(agentMgr, id, maintenance);
-	}
-
-
-	@Override
-	public void disconnect(Status state) {
-
-	}
-
-	
-	@Override
-	protected boolean isClosed() {
-		return false;
-	}
-
-	
-	@Override
-	public void send(Request req) throws AgentUnavailableException {
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/Routable.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/Routable.java b/server/src/com/cloud/agent/manager/Routable.java
deleted file mode 100644
index c8eb185..0000000
--- a/server/src/com/cloud/agent/manager/Routable.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import com.cloud.exception.AgentUnavailableException;
-
-public interface Routable {
-    /**
-     * Directly rout this data to the agent.
-     * 
-     * @param data
-     * @throws AgentUnavailableException
-     */
-    void routeToAgent(byte[] data) throws AgentUnavailableException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/SynchronousListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/SynchronousListener.java b/server/src/com/cloud/agent/manager/SynchronousListener.java
deleted file mode 100755
index 3698705..0000000
--- a/server/src/com/cloud/agent/manager/SynchronousListener.java
+++ /dev/null
@@ -1,135 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import org.apache.log4j.Logger;
-
-import com.cloud.agent.Listener;
-import com.cloud.agent.api.AgentControlAnswer;
-import com.cloud.agent.api.AgentControlCommand;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.StartupCommand;
-import com.cloud.host.Host;
-import com.cloud.host.Status;
-import com.cloud.utils.Profiler;
-
-public class SynchronousListener implements Listener {
-    private static final Logger s_logger = Logger.getLogger(SynchronousListener.class);
-	
-    protected Answer[] _answers;
-    protected boolean _disconnected;
-    protected String _peer;
-
-    public SynchronousListener(Listener listener) {
-        _answers = null;
-        _peer = null;
-    }
-    
-    public void setPeer(String peer) {
-        _peer = peer;
-    }
-    
-    public String getPeer() {
-        return _peer;
-    }
-    
-    public Answer[] getAnswers() {
-        return _answers;
-    }
-    
-    @Override
-    public boolean isRecurring() {
-        return false;
-    }
-    
-    public boolean isDisconnected() {
-        return _disconnected;
-    }
-
-    @Override
-    public synchronized boolean processAnswers(long agentId, long seq, Answer[] resp) {
-        _answers = resp;
-        notifyAll();
-        return true;
-    }
-    
-    @Override
-    public synchronized boolean processDisconnect(long agentId, Status state) {
-    	if(s_logger.isTraceEnabled())
-    		s_logger.trace("Agent disconnected, agent id: " + agentId + ", state: " + state + ". Will notify waiters");
-    	
-        _disconnected = true;
-        notifyAll();
-        return true;
-    }
-    
-    @Override
-    public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance) {
-    }
-
-    @Override
-    public boolean processCommands(long agentId, long seq, Command[] req) {
-        return false;
-    }
-    
-    @Override
-    public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
-    	return null;
-    }
-    
-    public Answer[] waitFor() throws InterruptedException {
-        return waitFor(-1);
-    }
-    
-    public synchronized Answer[] waitFor(int s) throws InterruptedException {
-        if (_disconnected) {
-            return null;
-        }
-        
-        if (_answers != null) {
-            return _answers;
-        }
-
-        Profiler profiler = new Profiler();
-        profiler.start();
-        if (s <= 0) {
-            wait();
-        } else {
-            int ms = s * 1000;
-            wait(ms);
-        }
-        profiler.stop();
-        
-        if(s_logger.isTraceEnabled()) {
-        	s_logger.trace("Synchronized command - sending completed, time: " + profiler.getDuration() + ", answer: " +
-    			(_answers != null ? _answers[0].toString() : "null"));
-        }
-        return _answers;
-    }
-    
-    @Override
-    public boolean processTimeout(long agentId, long seq) {
-    	return true;
-    }
-    
-    @Override
-    public int getTimeout() {
-    	return -1;
-    }
-    
-}


Mime
View raw message