Return-Path: X-Original-To: apmail-incubator-cloudstack-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-cloudstack-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8476BE196 for ; Thu, 10 Jan 2013 22:47:26 +0000 (UTC) Received: (qmail 37203 invoked by uid 500); 10 Jan 2013 22:47:21 -0000 Delivered-To: apmail-incubator-cloudstack-commits-archive@incubator.apache.org Received: (qmail 37157 invoked by uid 500); 10 Jan 2013 22:47:21 -0000 Mailing-List: contact cloudstack-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cloudstack-dev@incubator.apache.org Delivered-To: mailing list cloudstack-commits@incubator.apache.org Received: (qmail 36615 invoked by uid 99); 10 Jan 2013 22:47:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jan 2013 22:47:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7B03717B5B; Thu, 10 Jan 2013 22:47:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ahuang@apache.org To: cloudstack-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [5/25] removed componentlocator and inject Message-Id: <20130110224720.7B03717B5B@tyr.zones.apache.org> Date: Thu, 10 Jan 2013 22:47:20 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeManagerImpl.java b/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeManagerImpl.java index 98c6b69..77a5949 100644 --- a/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeManagerImpl.java +++ b/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeManagerImpl.java @@ -18,6 +18,8 @@ */ package org.apache.cloudstack.storage.volume; +import javax.inject.Inject; + import org.apache.cloudstack.engine.subsystem.api.storage.VolumeProfile; import org.apache.cloudstack.storage.volume.db.VolumeDao2; import org.apache.cloudstack.storage.volume.db.VolumeVO; @@ -26,7 +28,6 @@ import org.springframework.stereotype.Component; import com.cloud.storage.Volume; import com.cloud.storage.Volume.Event; import com.cloud.storage.Volume.State; -import com.cloud.utils.component.Inject; import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.StateMachine2; @@ -39,6 +40,7 @@ public class VolumeManagerImpl implements VolumeManager { initStateMachine(); } + @Override public VolumeVO allocateDuplicateVolume(VolumeVO oldVol) { /* VolumeVO newVol = new VolumeVO(oldVol.getVolumeType(), oldVol.getName(), oldVol.getDataCenterId(), oldVol.getDomainId(), oldVol.getAccountId(), oldVol.getDiskOfferingId(), oldVol.getSize()); @@ -47,58 +49,62 @@ public class VolumeManagerImpl implements VolumeManager { newVol.setInstanceId(oldVol.getInstanceId()); newVol.setRecreatable(oldVol.isRecreatable()); newVol.setReservationId(oldVol.getReservationId()); - */ + */ return null; // return _volumeDao.persist(newVol); } - + private void initStateMachine() { - s_fsm.addTransition(Volume.State.Allocated, Event.CreateRequested, Volume.State.Creating); - s_fsm.addTransition(Volume.State.Allocated, Event.DestroyRequested, Volume.State.Destroying); - s_fsm.addTransition(Volume.State.Creating, Event.OperationRetry, Volume.State.Creating); - s_fsm.addTransition(Volume.State.Creating, Event.OperationFailed, Volume.State.Allocated); - s_fsm.addTransition(Volume.State.Creating, Event.OperationSucceeded, Volume.State.Ready); - s_fsm.addTransition(Volume.State.Creating, Event.DestroyRequested, Volume.State.Destroying); - s_fsm.addTransition(Volume.State.Creating, Event.CreateRequested, Volume.State.Creating); - s_fsm.addTransition(Volume.State.Allocated, Event.UploadRequested, Volume.State.UploadOp); - s_fsm.addTransition(Volume.State.UploadOp, Event.CopyRequested, Volume.State.Creating);// CopyRequested for volume from sec to primary storage - s_fsm.addTransition(Volume.State.Creating, Event.CopySucceeded, Volume.State.Ready); - s_fsm.addTransition(Volume.State.Creating, Event.CopyFailed, Volume.State.UploadOp);// Copying volume from sec to primary failed. - s_fsm.addTransition(Volume.State.UploadOp, Event.DestroyRequested, Volume.State.Destroying); - s_fsm.addTransition(Volume.State.Ready, Event.DestroyRequested, Volume.State.Destroying); - s_fsm.addTransition(Volume.State.Destroy, Event.ExpungingRequested, Volume.State.Expunging); - s_fsm.addTransition(Volume.State.Ready, Event.SnapshotRequested, Volume.State.Snapshotting); - s_fsm.addTransition(Volume.State.Snapshotting, Event.OperationSucceeded, Volume.State.Ready); - s_fsm.addTransition(Volume.State.Snapshotting, Event.OperationFailed, Volume.State.Ready); - s_fsm.addTransition(Volume.State.Ready, Event.MigrationRequested, Volume.State.Migrating); - s_fsm.addTransition(Volume.State.Migrating, Event.OperationSucceeded, Volume.State.Ready); - s_fsm.addTransition(Volume.State.Migrating, Event.OperationFailed, Volume.State.Ready); - s_fsm.addTransition(Volume.State.Destroy, Event.OperationSucceeded, Volume.State.Destroy); - s_fsm.addTransition(Volume.State.Destroying, Event.OperationSucceeded, Volume.State.Destroy); - s_fsm.addTransition(Volume.State.Destroying, Event.OperationFailed, Volume.State.Destroying); - s_fsm.addTransition(Volume.State.Destroying, Event.DestroyRequested, Volume.State.Destroying); + s_fsm.addTransition(Volume.State.Allocated, Event.CreateRequested, Volume.State.Creating); + s_fsm.addTransition(Volume.State.Allocated, Event.DestroyRequested, Volume.State.Destroying); + s_fsm.addTransition(Volume.State.Creating, Event.OperationRetry, Volume.State.Creating); + s_fsm.addTransition(Volume.State.Creating, Event.OperationFailed, Volume.State.Allocated); + s_fsm.addTransition(Volume.State.Creating, Event.OperationSucceeded, Volume.State.Ready); + s_fsm.addTransition(Volume.State.Creating, Event.DestroyRequested, Volume.State.Destroying); + s_fsm.addTransition(Volume.State.Creating, Event.CreateRequested, Volume.State.Creating); + s_fsm.addTransition(Volume.State.Allocated, Event.UploadRequested, Volume.State.UploadOp); + s_fsm.addTransition(Volume.State.UploadOp, Event.CopyRequested, Volume.State.Creating);// CopyRequested for volume from sec to primary storage + s_fsm.addTransition(Volume.State.Creating, Event.CopySucceeded, Volume.State.Ready); + s_fsm.addTransition(Volume.State.Creating, Event.CopyFailed, Volume.State.UploadOp);// Copying volume from sec to primary failed. + s_fsm.addTransition(Volume.State.UploadOp, Event.DestroyRequested, Volume.State.Destroying); + s_fsm.addTransition(Volume.State.Ready, Event.DestroyRequested, Volume.State.Destroying); + s_fsm.addTransition(Volume.State.Destroy, Event.ExpungingRequested, Volume.State.Expunging); + s_fsm.addTransition(Volume.State.Ready, Event.SnapshotRequested, Volume.State.Snapshotting); + s_fsm.addTransition(Volume.State.Snapshotting, Event.OperationSucceeded, Volume.State.Ready); + s_fsm.addTransition(Volume.State.Snapshotting, Event.OperationFailed, Volume.State.Ready); + s_fsm.addTransition(Volume.State.Ready, Event.MigrationRequested, Volume.State.Migrating); + s_fsm.addTransition(Volume.State.Migrating, Event.OperationSucceeded, Volume.State.Ready); + s_fsm.addTransition(Volume.State.Migrating, Event.OperationFailed, Volume.State.Ready); + s_fsm.addTransition(Volume.State.Destroy, Event.OperationSucceeded, Volume.State.Destroy); + s_fsm.addTransition(Volume.State.Destroying, Event.OperationSucceeded, Volume.State.Destroy); + s_fsm.addTransition(Volume.State.Destroying, Event.OperationFailed, Volume.State.Destroying); + s_fsm.addTransition(Volume.State.Destroying, Event.DestroyRequested, Volume.State.Destroying); } - + @Override public StateMachine2 getStateMachine() { return s_fsm; } + @Override public VolumeVO processEvent(Volume vol, Volume.Event event) throws NoTransitionException { // _volStateMachine.transitTo(vol, event, null, _volumeDao); return _volumeDao.findById(vol.getId()); } + @Override public VolumeProfile getProfile(long volumeId) { // TODO Auto-generated method stub return null; } + @Override public VolumeVO getVolume(long volumeId) { // TODO Auto-generated method stub return null; } + @Override public VolumeVO updateVolume(VolumeVO volume) { // TODO Auto-generated method stub return null; http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeObject.java ---------------------------------------------------------------------- diff --git a/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeObject.java b/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeObject.java index a1eeb65..4ec6fba 100644 --- a/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeObject.java +++ b/engine/storage/volume/src/org/apache/cloudstack/storage/volume/VolumeObject.java @@ -13,13 +13,11 @@ import org.apache.cloudstack.engine.subsystem.api.storage.type.VolumeTypeHelper; import org.apache.cloudstack.storage.datastore.PrimaryDataStore; import org.apache.cloudstack.storage.volume.db.VolumeDao2; import org.apache.cloudstack.storage.volume.db.VolumeVO; - import org.apache.log4j.Logger; import com.cloud.storage.Volume; import com.cloud.storage.Volume.State; -import com.cloud.utils.component.ComponentInject; -import com.cloud.utils.db.DB; +import com.cloud.utils.component.ComponentContext; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.fsm.NoTransitionException; import com.cloud.utils.fsm.StateMachine2; @@ -41,13 +39,14 @@ public class VolumeObject implements VolumeInfo { this.volumeVO = volumeVO; this.dataStore = dataStore; } - + public static VolumeObject getVolumeObject(PrimaryDataStore dataStore, VolumeVO volumeVO) { VolumeObject vo = new VolumeObject(dataStore, volumeVO); - vo = ComponentInject.inject(vo); + vo = ComponentContext.inject(vo); return vo; } + @Override public String getUuid() { return volumeVO.getUuid(); } @@ -56,14 +55,17 @@ public class VolumeObject implements VolumeInfo { volumeVO.setUuid(uuid); } + @Override public String getPath() { return volumeVO.getPath(); } + @Override public String getTemplateUuid() { return null; } + @Override public String getTemplatePath() { return null; } @@ -76,18 +78,22 @@ public class VolumeObject implements VolumeInfo { return volumeVO.getState(); } + @Override public PrimaryDataStore getDataStore() { return dataStore; } + @Override public long getSize() { return volumeVO.getSize(); } + @Override public VolumeDiskType getDiskType() { return diskTypeHelper.getDiskType(volumeVO.getDiskType()); } + @Override public VolumeType getType() { return volumeTypeHelper.getType(volumeVO.getVolumeType()); } @@ -153,7 +159,7 @@ public class VolumeObject implements VolumeInfo { // TODO Auto-generated method stub return null; } - + @Override public String getName() { return this.volumeVO.getName(); http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/plugins/acl/static-role-based/src/org/apache/cloudstack/acl/StaticRoleBasedAPIAccessChecker.java ---------------------------------------------------------------------- diff --git a/plugins/acl/static-role-based/src/org/apache/cloudstack/acl/StaticRoleBasedAPIAccessChecker.java b/plugins/acl/static-role-based/src/org/apache/cloudstack/acl/StaticRoleBasedAPIAccessChecker.java index 9236fba..b6740ed 100644 --- a/plugins/acl/static-role-based/src/org/apache/cloudstack/acl/StaticRoleBasedAPIAccessChecker.java +++ b/plugins/acl/static-role-based/src/org/apache/cloudstack/acl/StaticRoleBasedAPIAccessChecker.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Properties; import javax.ejb.Local; +import javax.inject.Inject; import javax.naming.ConfigurationException; import org.apache.log4j.Logger; @@ -38,7 +39,6 @@ import com.cloud.user.AccountManager; import com.cloud.user.User; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.component.AdapterBase; -import com.cloud.utils.component.Inject; import com.cloud.utils.component.PluggableService; /* @@ -60,8 +60,8 @@ public class StaticRoleBasedAPIAccessChecker extends AdapterBase implements APIA private static List s_resourceDomainAdminCommands = null; private static List s_allCommands = null; - protected @Inject AccountManager _accountMgr; - @Inject protected List _services; + @Inject AccountManager _accountMgr; + @Inject List _services; protected StaticRoleBasedAPIAccessChecker() { super(); http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3abf731..c01e125 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ + true 1.6 UTF-8 @@ -88,7 +89,6 @@ 2.6 1.4 0.9.8 - true http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index b6d86e1..77f1fb8 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -90,6 +90,16 @@ cloud-engine-api ${project.version} + + org.apache.cloudstack + cloud-api + ${project.version} + + + org.apache.cloudstack + cloud-core + ${project.version} + install http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/server/src/com/cloud/agent/manager/AgentManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index ee5971f..f7ca034 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -107,7 +107,6 @@ import com.cloud.user.AccountManager; import com.cloud.utils.ActionDelegate; import com.cloud.utils.NumbersUtil; import com.cloud.utils.Pair; -import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.component.Manager; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; @@ -151,7 +150,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { protected List> _creationMonitors = new ArrayList>(17); protected List _loadingAgents = new ArrayList(); protected int _monitorId = 0; - private Lock _agentStatusLock = new ReentrantLock(); + private final Lock _agentStatusLock = new ReentrantLock(); protected NioServer _connection; @Inject @@ -195,10 +194,10 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { @Inject protected VirtualMachineManager _vmMgr = null; - + @Inject StorageService _storageSvr = null; @Inject StorageManager _storageMgr = null; - + @Inject protected HypervisorGuruManager _hvGuruMgr; @@ -222,11 +221,11 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { protected ExecutorService _executor; protected ThreadPoolExecutor _connectExecutor; - + protected StateMachine2 _statusStateMachine = Status.getStateMachine(); - + @Inject ResourceManager _resourceMgr; - + @Override public boolean configure(final String name, final Map params) throws ConfigurationException { _name = name; @@ -263,7 +262,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { _nodeId = ManagementServerNode.getManagementServerId(); s_logger.info("Configuring AgentManagerImpl. management server node id(msid): " + _nodeId); - + long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout; _hostDao.markHostsAsDisconnected(_nodeId, lastPing); @@ -276,7 +275,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { new LinkedBlockingQueue(), new NamedThreadFactory("AgentConnectTaskPool")); //allow core threads to time out even when there are no items in the queue _connectExecutor.allowCoreThreadTimeOut(true); - + _connection = new NioServer("AgentManager", _port, workers + 10, this); s_logger.info("Listening on " + _port + " with " + workers + " workers"); @@ -395,7 +394,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } else if ( ssHost.getType() == Host.Type.SecondaryStorage) { sendToSSVM(ssHost.getDataCenterId(), cmd, listener); } else { - String err = "do not support Secondary Storage type " + ssHost.getType(); + String err = "do not support Secondary Storage type " + ssHost.getType(); s_logger.warn(err); throw new CloudRuntimeException(err); } @@ -435,7 +434,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } Answer answer = null; try { - + long targetHostId = _hvGuruMgr.getGuruProcessedCommandTargetHost(host.getId(), cmd); answer = easySend(targetHostId, cmd); } catch (Exception e) { @@ -552,7 +551,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { assert cmds.length > 0 : "Why are you sending zero length commands?"; if (cmds.length == 0) { - throw new AgentUnavailableException("Empty command set for agent " + agent.getId(), agent.getId()); + throw new AgentUnavailableException("Empty command set for agent " + agent.getId(), agent.getId()); } Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true); req.setSequence(agent.getNextSequence()); @@ -585,7 +584,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { if (removed != null) { removed.disconnect(nextState); } - + for (Pair monitor : _hostMonitors) { if (s_logger.isDebugEnabled()) { s_logger.debug("Sending Disconnect to listener: " + monitor.second().getClass().getName()); @@ -593,7 +592,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { monitor.second().processDisconnect(hostId, nextState); } } - + protected AgentAttache notifyMonitorsOfConnection(AgentAttache attache, final StartupCommand[] cmd, boolean forRebalance) throws ConnectionException { long hostId = attache.getId(); HostVO host = _hostDao.findById(hostId); @@ -678,7 +677,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { loadDirectlyConnectedHost(host, false); } } - + private ServerResource loadResourcesWithoutHypervisor(HostVO host){ String resourceName = host.getResource(); ServerResource resource = null; @@ -704,10 +703,10 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { if(resource != null){ _hostDao.loadDetails(host); - + HashMap params = new HashMap(host.getDetails().size() + 5); params.putAll(host.getDetails()); - + params.put("guid", host.getGuid()); params.put("zone", Long.toString(host.getDataCenterId())); if (host.getPodId() != null) { @@ -726,19 +725,19 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { params.put("pool", guid); } } - + params.put("ipaddress", host.getPrivateIpAddress()); params.put("secondary.storage.vm", "false"); params.put("max.template.iso.size", _configDao.getValue(Config.MaxTemplateAndIsoSize.toString())); params.put("migratewait", _configDao.getValue(Config.MigrateWait.toString())); - + try { resource.configure(host.getName(), params); } catch (ConfigurationException e) { s_logger.warn("Unable to configure resource due to " + e.getMessage()); return null; } - + if (!resource.start()) { s_logger.warn("Unable to start the resource"); return null; @@ -746,13 +745,13 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } return resource; } - + @SuppressWarnings("rawtypes") protected boolean loadDirectlyConnectedHost(HostVO host, boolean forRebalance) { - boolean initialized = false; + boolean initialized = false; ServerResource resource = null; - try { + try { //load the respective discoverer Discoverer discoverer = _resourceMgr.getMatchingDiscover(host.getHypervisorType()); if(discoverer == null){ @@ -761,20 +760,20 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { }else{ resource = discoverer.reloadResource(host); } - + if(resource == null){ s_logger.warn("Unable to load the resource: "+ host.getId()); return false; } - - initialized = true; - } finally { - if(!initialized) { + + initialized = true; + } finally { + if(!initialized) { if (host != null) { agentStatusTransitTo(host, Event.AgentDisconnected, _nodeId); } - } - } + } + } if (forRebalance) { Host h = _resourceMgr.createHostAndAgent(host.getId(), resource, host.getDetails(), false, null, true); @@ -790,10 +789,10 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { if (resource instanceof DummySecondaryStorageResource || resource instanceof KvmDummyResourceBase) { return new DummyAttache(this, host.getId(), false); } - + s_logger.debug("create DirectAgentAttache for " + host.getId()); DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), resource, host.isInMaintenanceStates(), this); - + AgentAttache old = null; synchronized (_agents) { old = _agents.put(host.getId(), attache); @@ -804,7 +803,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { return attache; } - + @Override public boolean stop() { if (_monitor != null) { @@ -823,13 +822,13 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { s_logger.debug("Cant not find host " + agent.getId()); } } else { - if (!agent.forForward()) { - agentStatusTransitTo(host, Event.ManagementServerDown, _nodeId); - } + if (!agent.forForward()) { + agentStatusTransitTo(host, Event.ManagementServerDown, _nodeId); + } } } } - + _connectExecutor.shutdownNow(); return true; } @@ -838,7 +837,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { public String getName() { return _name; } - + protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event, boolean transitState) { long hostId = attache.getId(); @@ -863,7 +862,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { s_logger.debug(err); throw new CloudRuntimeException(err); } - + if (s_logger.isDebugEnabled()) { s_logger.debug("The next status of agent " + hostId + "is " + nextStatus + ", current status is " + currentStatus); } @@ -876,15 +875,15 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { //remove the attache removeAgent(attache, nextStatus); - + //update the DB if (host != null && transitState) { - disconnectAgent(host, event, _nodeId); + disconnectAgent(host, event, _nodeId); } return true; } - + protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) { long hostId = attache.getId(); HostVO host = _hostDao.findById(hostId); @@ -898,7 +897,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { * God knew what race condition the code dealt with! */ } - + if (nextStatus == Status.Alert) { /* OK, we are going to the bad status, let's see what happened */ s_logger.info("Investigating why host " + hostId + " has disconnected with event " + event); @@ -947,7 +946,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { s_logger.debug("The next status of Agent " + host.getId() + " is not Alert, no need to investigate what happened"); } } - + handleDisconnectWithoutInvestigation(attache, event, true); host = _hostDao.findById(host.getId()); if (host.getStatus() == Status.Alert || host.getStatus() == Status.Down) { @@ -970,11 +969,11 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { @Override public void run() { - try { + try { if (_investigate == true) { handleDisconnectWithInvestigation(_attache, _event); } else { - handleDisconnectWithoutInvestigation(_attache, _event, true); + handleDisconnectWithoutInvestigation(_attache, _event, true); } } catch (final Exception e) { s_logger.error("Exception caught while handling disconnect: ", e); @@ -1059,14 +1058,14 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { @Override public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException { - if (event == Event.AgentDisconnected) { + if (event == Event.AgentDisconnected) { if (s_logger.isDebugEnabled()) { s_logger.debug("Received agent disconnect event for host " + hostId); } AgentAttache attache = null; attache = findAttache(hostId); if (attache != null) { - handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true); + handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true); } return true; } else if (event == Event.ShutdownRequested) { @@ -1079,7 +1078,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { s_logger.debug("create ConnectedAgentAttache for " + host.getId()); AgentAttache attache = new ConnectedAgentAttache(this, host.getId(), link, host.isInMaintenanceStates()); link.attach(attache); - + AgentAttache old = null; synchronized (_agents) { old = _agents.put(host.getId(), attache); @@ -1090,36 +1089,36 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { return attache; } - + private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, Request request) { - AgentAttache attache = null; - ReadyCommand ready = null; - try { - HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup); - if (host != null) { - ready = new ReadyCommand(host.getDataCenterId(), host.getId()); - attache = createAttacheForConnect(host, link); - attache = notifyMonitorsOfConnection(attache, startup, false); - } + AgentAttache attache = null; + ReadyCommand ready = null; + try { + HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup); + if (host != null) { + ready = new ReadyCommand(host.getDataCenterId(), host.getId()); + attache = createAttacheForConnect(host, link); + attache = notifyMonitorsOfConnection(attache, startup, false); + } } catch (Exception e) { - s_logger.debug("Failed to handle host connection: " + e.toString()); - ready = new ReadyCommand(null); - ready.setDetails(e.toString()); + s_logger.debug("Failed to handle host connection: " + e.toString()); + ready = new ReadyCommand(null); + ready.setDetails(e.toString()); } finally { if (ready == null) { ready = new ReadyCommand(null); - } + } } - + try { - if (attache == null) { - final Request readyRequest = new Request(-1, -1, ready, false); - link.send(readyRequest.getBytes()); - } else { - easySend(attache.getId(), ready); - } + if (attache == null) { + final Request readyRequest = new Request(-1, -1, ready, false); + link.send(readyRequest.getBytes()); + } else { + easySend(attache.getId(), ready); + } } catch (Exception e) { - s_logger.debug("Failed to send ready command:" + e.toString()); + s_logger.debug("Failed to send ready command:" + e.toString()); } return attache; } @@ -1143,7 +1142,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { if (s_logger.isDebugEnabled()) { s_logger.debug("Simulating start for resource " + resource.getName() + " id " + id); } - + _resourceMgr.createHostAndAgent(id, resource, details, false, null, false); } catch (Exception e) { s_logger.warn("Unable to simulate start on resource " + id + " name " + resource.getName(), e); @@ -1174,32 +1173,32 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { for (int i = 0; i < _cmds.length; i++) { startups[i] = (StartupCommand) _cmds[i]; } - + AgentAttache attache = handleConnectedAgent(_link, startups, _request); if (attache == null) { s_logger.warn("Unable to create attache for agent: " + _request); } } } - + protected void connectAgent(Link link, final Command[] cmds, final Request request) { - //send startupanswer to agent in the very beginning, so agent can move on without waiting for the answer for an undetermined time, if we put this logic into another thread pool. - StartupAnswer[] answers = new StartupAnswer[cmds.length]; - Command cmd; - for (int i = 0; i < cmds.length; i++) { - cmd = cmds[i]; - if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { - answers[i] = new StartupAnswer((StartupCommand)cmds[i], 0, getPingInterval()); - break; - } - } - Response response = null; - response = new Response(request, answers[0], _nodeId, -1); - try { - link.send(response.toBytes()); - } catch (ClosedChannelException e) { - s_logger.debug("Failed to send startupanswer: " + e.toString()); - } + //send startupanswer to agent in the very beginning, so agent can move on without waiting for the answer for an undetermined time, if we put this logic into another thread pool. + StartupAnswer[] answers = new StartupAnswer[cmds.length]; + Command cmd; + for (int i = 0; i < cmds.length; i++) { + cmd = cmds[i]; + if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) { + answers[i] = new StartupAnswer((StartupCommand)cmds[i], 0, getPingInterval()); + break; + } + } + Response response = null; + response = new Response(request, answers[0], _nodeId, -1); + try { + link.send(response.toBytes()); + } catch (ClosedChannelException e) { + s_logger.debug("Failed to send startupanswer: " + e.toString()); + } _connectExecutor.execute(new HandleAgentConnectTask(link, cmds, request)); } @@ -1215,14 +1214,14 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { boolean logD = true; if (attache == null) { - if (!(cmd instanceof StartupCommand)) { - s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request); - } else { - //submit the task for execution - request.logD("Scheduling the first command "); - connectAgent(link, cmds, request); - } - return; + if (!(cmd instanceof StartupCommand)) { + s_logger.warn("Throwing away a request because it came through as the first command on a connect: " + request); + } else { + //submit the task for execution + request.logD("Scheduling the first command "); + connectAgent(link, cmds, request); + } + return; } final long hostId = attache.getId(); @@ -1286,20 +1285,20 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { if (cmd instanceof PingRoutingCommand) { boolean gatewayAccessible = ((PingRoutingCommand) cmd).isGatewayAccessible(); HostVO host = _hostDao.findById(Long.valueOf(cmdHostId)); - + if (host != null) { - if (!gatewayAccessible) { - // alert that host lost connection to - // gateway (cannot ping the default route) - DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); - HostPodVO podVO = _podDao.findById(host.getPodId()); - String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); - - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId(), "Host lost connection to gateway, " + hostDesc, "Host [" + hostDesc - + "] lost connection to gateway (default route) and is possibly having network connection issues."); - } else { - _alertMgr.clearAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId()); - } + if (!gatewayAccessible) { + // alert that host lost connection to + // gateway (cannot ping the default route) + DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); + HostPodVO podVO = _podDao.findById(host.getPodId()); + String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); + + _alertMgr.sendAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId(), "Host lost connection to gateway, " + hostDesc, "Host [" + hostDesc + + "] lost connection to gateway (default route) and is possibly having network connection issues."); + } else { + _alertMgr.clearAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId()); + } } else { s_logger.debug("Not processing " + PingRoutingCommand.class.getSimpleName() + " for agent id=" + cmdHostId + "; can't find the host in the DB"); @@ -1391,7 +1390,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { protected AgentManagerImpl() { } - @Override + @Override public boolean tapLoadingAgents(Long hostId, TapAgentsAction action) { synchronized (_loadingAgents) { if (action == TapAgentsAction.Add) { @@ -1406,58 +1405,58 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { } return true; } - + @Override public boolean agentStatusTransitTo(HostVO host, Status.Event e, long msId) { - try { - _agentStatusLock.lock(); - if (status_logger.isDebugEnabled()) { - ResourceState state = host.getResourceState(); - StringBuilder msg = new StringBuilder("Transition:"); - msg.append("[Resource state = ").append(state); - msg.append(", Agent event = ").append(e.toString()); - msg.append(", Host id = ").append(host.getId()).append(", name = " + host.getName()).append("]"); - status_logger.debug(msg); - } - - host.setManagementServerId(msId); - try { - return _statusStateMachine.transitTo(host, e, host.getId(), _hostDao); - } catch (NoTransitionException e1) { - status_logger.debug("Cannot transit agent status with event " + e + " for host " + host.getId() + ", name=" + host.getName() - + ", mangement server id is " + msId); - throw new CloudRuntimeException("Cannot transit agent status with event " + e + " for host " + host.getId() + ", mangement server id is " - + msId + "," + e1.getMessage()); - } - } finally { - _agentStatusLock.unlock(); - } + try { + _agentStatusLock.lock(); + if (status_logger.isDebugEnabled()) { + ResourceState state = host.getResourceState(); + StringBuilder msg = new StringBuilder("Transition:"); + msg.append("[Resource state = ").append(state); + msg.append(", Agent event = ").append(e.toString()); + msg.append(", Host id = ").append(host.getId()).append(", name = " + host.getName()).append("]"); + status_logger.debug(msg); + } + + host.setManagementServerId(msId); + try { + return _statusStateMachine.transitTo(host, e, host.getId(), _hostDao); + } catch (NoTransitionException e1) { + status_logger.debug("Cannot transit agent status with event " + e + " for host " + host.getId() + ", name=" + host.getName() + + ", mangement server id is " + msId); + throw new CloudRuntimeException("Cannot transit agent status with event " + e + " for host " + host.getId() + ", mangement server id is " + + msId + "," + e1.getMessage()); + } + } finally { + _agentStatusLock.unlock(); + } } - + public boolean disconnectAgent(HostVO host, Status.Event e, long msId) { host.setDisconnectedOn(new Date()); if (e.equals(Status.Event.Remove)) { host.setGuid(null); host.setClusterId(null); } - + return agentStatusTransitTo(host, e, msId); } - + protected void disconnectWithoutInvestigation(AgentAttache attache, final Status.Event event) { _executor.submit(new DisconnectTask(attache, event, false)); } - + protected void disconnectWithInvestigation(AgentAttache attache, final Status.Event event) { _executor.submit(new DisconnectTask(attache, event, true)); } - + private void disconnectInternal(final long hostId, final Status.Event event, boolean invstigate) { AgentAttache attache = findAttache(hostId); if (attache != null) { if (!invstigate) { - disconnectWithoutInvestigation(attache, event); + disconnectWithoutInvestigation(attache, event); } else { disconnectWithInvestigation(attache, event); } @@ -1470,35 +1469,35 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { HostVO host = _hostDao.findById(hostId); if (host != null && host.getRemoved() == null) { - disconnectAgent(host, event, _nodeId); + disconnectAgent(host, event, _nodeId); } } } - + public void disconnectWithInvestigation(final long hostId, final Status.Event event) { disconnectInternal(hostId, event, true); } - + @Override public void disconnectWithoutInvestigation(final long hostId, final Status.Event event) { disconnectInternal(hostId, event, false); } - @Override + @Override public AgentAttache handleDirectConnectAgent(HostVO host, StartupCommand[] cmds, ServerResource resource, boolean forRebalance) throws ConnectionException { - AgentAttache attache; - - attache = createAttacheForDirectConnect(host, resource); + AgentAttache attache; + + attache = createAttacheForDirectConnect(host, resource); StartupAnswer[] answers = new StartupAnswer[cmds.length]; for (int i = 0; i < answers.length; i++) { answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval); } attache.process(answers); - attache = notifyMonitorsOfConnection(attache, cmds, forRebalance); - - return attache; + attache = notifyMonitorsOfConnection(attache, cmds, forRebalance); + + return attache; } - + @Override public void pullAgentToMaintenance(long hostId) { AgentAttache attache = findAttache(hostId); @@ -1508,15 +1507,15 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager { attache.cancelAllCommands(Status.Disconnected, false); } } - + @Override public void pullAgentOutMaintenance(long hostId) { AgentAttache attache = findAttache(hostId); if (attache != null) { - attache.setMaintenanceMode(false); + attache.setMaintenanceMode(false); } } - - - + + + } http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 6753b28..dbd7cd3 100755 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -80,8 +80,6 @@ import com.cloud.resource.ServerResource; import com.cloud.storage.resource.DummySecondaryStorageResource; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; -import com.cloud.utils.component.Adapters; -import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.SearchCriteria2; @@ -116,10 +114,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust protected ManagementServerHostDao _mshostDao; @Inject protected HostTransferMapDao _hostTransferDao; - + // @com.cloud.utils.component.Inject(adapter = AgentLoadBalancerPlanner.class) @Inject protected List _lbPlanners; - + @Inject protected AgentManager _agentMgr; @Inject ConfigurationDao _configDao; @@ -133,7 +131,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust _peers = new HashMap(7); _sslEngines = new HashMap(7); _nodeId = _clusterMgr.getManagementNodeId(); - + s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId); Map params = _configDao.getConfiguration(xmlParams); @@ -143,7 +141,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust ClusteredAgentAttache.initialize(this); _clusterMgr.registerListener(this); - + return super.configure(name, xmlParams); } @@ -177,7 +175,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize, _nodeId); List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); hosts.addAll(appliances); - + if (hosts != null && hosts.size() > 0) { s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them..."); for (HostVO host : hosts) { @@ -278,12 +276,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event, boolean transitState) { return handleDisconnect(attache, event, false, true); } - + @Override protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) { return handleDisconnect(attache, event, true, true); } - + protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast) { boolean res; if (!investigate) { @@ -292,14 +290,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust res = super.handleDisconnectWithInvestigation(agent, event); } - if (res) { - if (broadcast) { - notifyNodesInCluster(agent); - } - return true; - } else { - return false; - } + if (res) { + if (broadcast) { + notifyNodesInCluster(agent); + } + return true; + } else { + return false; + } } @Override @@ -343,15 +341,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public boolean reconnect(final long hostId) { Boolean result; try { - result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested); - if (result != null) { - return result; - } + result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested); + if (result != null) { + return result; + } } catch (AgentUnavailableException e) { - s_logger.debug("cannot propagate agent reconnect because agent is not available", e); - return false; + s_logger.debug("cannot propagate agent reconnect because agent is not available", e); + return false; } - + return super.reconnect(hostId); } @@ -413,7 +411,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public String findPeer(long hostId) { return _clusterMgr.getPeerName(hostId); } - + public SSLEngine getSSLEngine(String peerName) { return _sslEngines.get(peerName); } @@ -520,7 +518,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } if (agent == null) { - AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); + AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); ex.addProxyObject(ApiDBUtils.findHostById(hostId).getUuid()); throw ex; } @@ -540,11 +538,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } _timer.cancel(); - + //cancel all transfer tasks s_transferExecutor.shutdownNow(); cleanupTransferMap(_nodeId); - + return super.stop(); } @@ -698,19 +696,19 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException { - boolean result = false; + boolean result = false; if (event == Event.RequestAgentRebalance) { return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId); } else if (event == Event.StartAgentRebalance) { try { - result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); + result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); } catch (Exception e) { s_logger.warn("Unable to rebalance host id=" + agentId, e); } } return result; } - + @Override public void scheduleRebalanceAgents() { _timer.schedule(new AgentLoadBalancerTask(), 30000); @@ -735,20 +733,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override public synchronized void run() { - try { - if (!cancelled) { - startRebalanceAgents(); - if (s_logger.isInfoEnabled()) { - s_logger.info("The agent load balancer task is now being cancelled"); - } - cancelled = true; - } - } catch(Throwable e) { - s_logger.error("Unexpected exception " + e.toString(), e); - } + try { + if (!cancelled) { + startRebalanceAgents(); + if (s_logger.isInfoEnabled()) { + s_logger.info("The agent load balancer task is now being cancelled"); + } + cancelled = true; + } + } catch(Throwable e) { + s_logger.error("Unexpected exception " + e.toString(), e); + } } } - + public void startRebalanceAgents() { s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents"); List allMS = _mshostDao.listBy(ManagementServerHost.State.Up); @@ -767,7 +765,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } return; } - + if (avLoad == 0L) { if (s_logger.isDebugEnabled()) { s_logger.debug("As calculated average load is less than 1, rounding it to 1"); @@ -777,7 +775,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust for (ManagementServerHostVO node : allMS) { if (node.getMsid() != _nodeId) { - + List hostsToRebalance = new ArrayList(); for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad); @@ -788,14 +786,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } - + if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid()); for (HostVO host : hostsToRebalance) { long hostId = host.getId(); s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId); boolean result = true; - + if (_hostTransferDao.findById(hostId) != null) { s_logger.warn("Somebody else is already rebalancing host id: " + hostId); continue; @@ -867,7 +865,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust for (Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { Long hostId = iterator.next(); AgentAttache attache = findAttache(hostId); - + // if the thread: // 1) timed out waiting for the host to reconnect // 2) recipient management server is not active any more @@ -883,14 +881,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust _hostTransferDao.completeAgentTransfer(hostId); continue; } - + if (transferMap.getInitialOwner() != _nodeId || attache == null || attache.forForward()) { s_logger.debug("Management server " + _nodeId + " doesn't own host id=" + hostId + " any more, skipping rebalance for the host"); iterator.remove(); _hostTransferDao.completeAgentTransfer(hostId); continue; } - + ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner()); if (ms != null && ms.getState() != ManagementServerHost.State.Up) { s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host"); @@ -898,7 +896,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust _hostTransferDao.completeAgentTransfer(hostId); continue; } - + if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { iterator.remove(); try { @@ -907,7 +905,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution"); continue; } - + } else { s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize()); } @@ -925,16 +923,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } }; } - - + + private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) { s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer"); synchronized (_agentToTransferIds) { return _agentToTransferIds.add(hostId); } } - - + + protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException{ boolean result = true; @@ -954,7 +952,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex); result = false; } - + if (result) { s_logger.debug("Successfully transfered host id=" + hostId + " to management server " + futureOwnerId); finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted); @@ -962,7 +960,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.warn("Failed to transfer host id=" + hostId + " to management server " + futureOwnerId); finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); } - + } else if (futureOwnerId == _nodeId) { HostVO host = _hostDao.findById(hostId); try { @@ -977,9 +975,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (result) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); - } - result = loadDirectlyConnectedHost(host, true); + s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); + } + result = loadDirectlyConnectedHost(host, true); } else { s_logger.warn("Failed to disconnect " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification"); @@ -989,7 +987,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process due to:", ex); result = false; } - + if (result) { s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId + " as a part of rebalance process"); } else { @@ -999,7 +997,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return result; } - + protected void finishRebalance(final long hostId, long futureOwnerId, Event event){ @@ -1007,21 +1005,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (s_logger.isDebugEnabled()) { s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event); } - + AgentAttache attache = findAttache(hostId); if (attache == null || !(attache instanceof ClusteredAgentAttache)) { s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already"); _hostTransferDao.completeAgentTransfer(hostId); return; } - + ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; - + if (success) { //1) Set transfer mode to false - so the agent can start processing requests normally forwardAttache.setTransferMode(false); - + //2) Get all transfer requests and route them to peer Request requestToTransfer = forwardAttache.getRequestToTransfer(); while (requestToTransfer != null) { @@ -1030,20 +1028,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (!routeResult) { logD(requestToTransfer.getBytes(), "Failed to route request to peer"); } - + requestToTransfer = forwardAttache.getRequestToTransfer(); } - + s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance to " + futureOwnerId); - + } else { failRebalance(hostId); } - + s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance"); _hostTransferDao.completeAgentTransfer(hostId); } - + protected void failRebalance(final long hostId){ try { s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId); @@ -1053,19 +1051,19 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup"); } } - + protected boolean startRebalance(final long hostId) { HostVO host = _hostDao.findById(hostId); - + if (host == null || host.getRemoved() != null) { s_logger.warn("Unable to find host record, fail start rebalancing process"); return false; } - + synchronized (_agents) { ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { - handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true); + handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true); ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); if (forwardAttache == null) { s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process"); @@ -1086,27 +1084,27 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust _hostTransferDao.startAgentTransfer(hostId); return true; } - + protected void cleanupTransferMap(long msId) { List hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); - + for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { _hostTransferDao.remove(hostJoingingCluster.getId()); } - + List hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { _hostTransferDao.remove(hostLeavingCluster.getId()); } } - - + + protected class RebalanceTask implements Runnable { Long hostId = null; Long currentOwnerId = null; Long futureOwnerId = null; - - + + public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) { this.hostId = hostId; this.currentOwnerId = currentOwnerId; @@ -1127,5 +1125,5 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } } - + } http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f40e7b75/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java b/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java index 9951896..c8bbe02 100755 --- a/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java +++ b/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java @@ -26,14 +26,12 @@ import javax.inject.Inject; import org.springframework.stereotype.Component; import com.cloud.agent.manager.allocator.HostAllocator; -import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.deploy.DeploymentPlan; import com.cloud.deploy.DeploymentPlanner.ExcludeList; import com.cloud.host.Host; import com.cloud.host.Host.Type; import com.cloud.host.dao.HostDao; import com.cloud.offering.ServiceOffering; -import com.cloud.utils.component.ComponentLocator; import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachineProfile; @@ -51,19 +49,19 @@ public class TestingAllocator implements HostAllocator { ExcludeList avoid, int returnUpTo) { return allocateTo(vmProfile, plan, type, avoid, returnUpTo, true); } - + @Override public List allocateTo(VirtualMachineProfile vmProfile, DeploymentPlan plan, Type type, - ExcludeList avoid, int returnUpTo, boolean considerReservedCapacity) { - List availableHosts = new ArrayList(); - Host host = null; + ExcludeList avoid, int returnUpTo, boolean considerReservedCapacity) { + List availableHosts = new ArrayList(); + Host host = null; if (type == Host.Type.Routing && _routingHost != null) { - host = _hostDao.findById(_routingHost); + host = _hostDao.findById(_routingHost); } else if (type == Host.Type.Storage && _storageHost != null) { - host = _hostDao.findById(_storageHost); + host = _hostDao.findById(_storageHost); } if(host != null){ - availableHosts.add(host); + availableHosts.add(host); } return availableHosts; } @@ -82,9 +80,9 @@ public class TestingAllocator implements HostAllocator { value = (String)params.get(Host.Type.Storage.toString()); _storageHost = (value != null) ? Long.parseLong(value) : null; - + _name = name; - + return true; }