Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1553210AF3 for ; Fri, 17 Jan 2014 22:41:38 +0000 (UTC) Received: (qmail 1727 invoked by uid 500); 17 Jan 2014 22:40:34 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 1232 invoked by uid 500); 17 Jan 2014 22:40:19 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 535 invoked by uid 99); 17 Jan 2014 22:40:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jan 2014 22:40:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3364E379B8; Fri, 17 Jan 2014 22:40:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mchen@apache.org To: commits@cloudstack.apache.org Date: Fri, 17 Jan 2014 22:40:21 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/50] [abbrv] git commit: updated refs/heads/rbac to 929fbab CLOUDSTACK-5696: Fix sync issue with out-of-band changes Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/7164fc6e Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/7164fc6e Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/7164fc6e Branch: refs/heads/rbac Commit: 7164fc6e738137b452e89f8889a9cd3f3bdb3c29 Parents: 9aaea28 Author: Kelven Yang Authored: Tue Jan 14 17:32:52 2014 -0800 Committer: Kelven Yang Committed: Wed Jan 15 13:11:02 2014 -0800 ---------------------------------------------------------------------- .../com/cloud/vm/VirtualMachineManagerImpl.java | 16 ++- .../cloud/vm/VirtualMachinePowerStateSync.java | 2 + .../vm/VirtualMachinePowerStateSyncImpl.java | 21 ++-- .../src/com/cloud/vm/dao/VMInstanceDao.java | 2 + .../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 117 +++++++++++-------- .../framework/messagebus/MessageDispatcher.java | 62 ++++++++-- 6 files changed, 150 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 628528a..9894d31 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -61,6 +61,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; import org.apache.cloudstack.framework.messagebus.MessageHandler; import org.apache.cloudstack.jobs.JobInfo; import org.apache.cloudstack.managed.context.ManagedContextRunnable; @@ -578,6 +579,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _agentMgr.registerForHostEvents(this, true, true, true); + if (VmJobEnabled.value()) { + _messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this)); + } + return true; } @@ -3816,7 +3821,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // @MessageHandler(topic = Topics.VM_POWER_STATE) - private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) { + private void HandlePowerStateReport(String subject, String senderAddress, Object args) { assert (args != null); Long vmId = (Long)args; @@ -3836,7 +3841,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac break; // PowerUnknown shouldn't be reported, it is a derived - // VM power state from host state (host un-reachable + // VM power state from host state (host un-reachable) case PowerUnknown: default: assert (false); @@ -3846,8 +3851,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); } } else { - // TODO, do job wake-up signalling, since currently async job wake-up is not in use - // we will skip it for nows + // reset VM power state tracking so that we won't lost signal when VM has + // been translated to + _vmDao.resetVmPowerStateTracking(vmId); } } @@ -3924,6 +3930,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac switch (vm.getState()) { case Starting: case Stopping: + case Running: case Stopped: case Migrating: try { @@ -3937,7 +3944,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // TODO: we need to forcely release all resource allocation break; - case Running: case Destroyed: case Expunging: break; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java index f84c7b7..152d0d8 100644 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java @@ -28,4 +28,6 @@ public interface VirtualMachinePowerStateSync { // to adapt legacy ping report void processHostVmStatePingReport(long hostId, Map report); + + Map convertVmStateReport(Map states); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java index cd4c3c0..453890c 100644 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java @@ -32,12 +32,9 @@ import com.cloud.vm.dao.VMInstanceDao; public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync { private static final Logger s_logger = Logger.getLogger(VirtualMachinePowerStateSyncImpl.class); - @Inject - MessageBus _messageBus; - @Inject - VMInstanceDao _instanceDao; - @Inject - VirtualMachineManager _vmMgr; + @Inject MessageBus _messageBus; + @Inject VMInstanceDao _instanceDao; + @Inject VirtualMachineManager _vmMgr; public VirtualMachinePowerStateSyncImpl() { } @@ -53,7 +50,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat if (s_logger.isDebugEnabled()) s_logger.debug("Process host VM state report from ping process. host: " + hostId); - Map translatedInfo = convertToInfos(report); + Map translatedInfo = convertVmStateReport(report); processReport(hostId, translatedInfo); } @@ -62,7 +59,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat if (s_logger.isDebugEnabled()) s_logger.debug("Process host VM state report from ping process. host: " + hostId); - Map translatedInfo = convertToInfos(report); + Map translatedInfo = convertVmStateReport(report); processReport(hostId, translatedInfo); } @@ -74,16 +71,19 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue()); if (_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) { - if (s_logger.isDebugEnabled()) s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue()); _messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey()); + } else { + if (s_logger.isDebugEnabled()) + s_logger.debug("VM power state does not change, skip DB writing. vm id: " + entry.getKey()); } } } - private Map convertToInfos(Map states) { + @Override + public Map convertVmStateReport(Map states) { final HashMap map = new HashMap(); if (states == null) { return map; @@ -93,7 +93,6 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat VMInstanceVO vm = findVM(entry.getKey()); if (vm != null) { map.put(vm.getId(), entry.getValue().getState()); - break; } else { s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey()); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java index e6ea4a5..453d222 100644 --- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java +++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java @@ -69,6 +69,8 @@ public interface VMInstanceDao extends GenericDao, StateDao< List findVMInTransition(Date time, State... states); + List listByHostAndState(long hostId, State... states); + List listByTypes(VirtualMachine.Type... types); VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java index 605ece3..2f25f57 100644 --- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java +++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java @@ -48,7 +48,11 @@ import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallback; +import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionLegacy; +import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.db.UpdateBuilder; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.vm.NicVO; @@ -76,6 +80,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase implem protected SearchBuilder TypesSearch; protected SearchBuilder IdTypesSearch; protected SearchBuilder HostIdTypesSearch; + protected SearchBuilder HostIdStatesSearch; protected SearchBuilder HostIdUpTypesSearch; protected SearchBuilder HostUpSearch; protected SearchBuilder InstanceNameSearch; @@ -182,6 +187,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase implem HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN); HostIdTypesSearch.done(); + HostIdStatesSearch = createSearchBuilder(); + HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ); + HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN); + HostIdStatesSearch.done(); + HostIdUpTypesSearch = createSearchBuilder(); HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ); HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN); @@ -335,6 +345,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase implem } @Override + public List listByHostAndState(long hostId, State... states) { + SearchCriteria sc = HostIdStatesSearch.create(); + sc.setParameters("hostId", hostId); + sc.setParameters("states", (Object[])states); + + return listBy(sc); + } + + @Override public List listUpByHostIdTypes(long hostid, Type... types) { SearchCriteria sc = HostIdUpTypesSearch.create(); sc.setParameters("hostid", hostid); @@ -702,60 +721,66 @@ public class VMInstanceDaoImpl extends GenericDaoBase implem } @Override - public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState) { - boolean needToUpdate = false; - TransactionLegacy txn = TransactionLegacy.currentTxn(); - txn.start(); - - VMInstanceVO instance = findById(instanceId); - if (instance != null) { - Long savedPowerHostId = instance.getPowerHostId(); - if (instance.getPowerState() != powerState || savedPowerHostId == null || savedPowerHostId.longValue() != powerHostId) { - instance.setPowerState(powerState); - instance.setPowerHostId(powerHostId); - instance.setPowerStateUpdateCount(1); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - needToUpdate = true; - update(instanceId, instance); - } else { - // to reduce DB updates, consecutive same state update for more than 3 times - if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) { - instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - needToUpdate = true; - update(instanceId, instance); + public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState) { + return Transaction.execute(new TransactionCallback() { + @Override + public Boolean doInTransaction(TransactionStatus status) { + boolean needToUpdate = false; + VMInstanceVO instance = findById(instanceId); + if (instance != null) { + Long savedPowerHostId = instance.getPowerHostId(); + if (instance.getPowerState() != powerState || savedPowerHostId == null + || savedPowerHostId.longValue() != powerHostId) { + instance.setPowerState(powerState); + instance.setPowerHostId(powerHostId); + instance.setPowerStateUpdateCount(1); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + needToUpdate = true; + update(instanceId, instance); + } else { + // to reduce DB updates, consecutive same state update for more than 3 times + if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) { + instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + needToUpdate = true; + update(instanceId, instance); + } + } } + return needToUpdate; } - } - - txn.commit(); - return needToUpdate; + }); } @Override - public void resetVmPowerStateTracking(long instanceId) { - TransactionLegacy txn = TransactionLegacy.currentTxn(); - txn.start(); - VMInstanceVO instance = findById(instanceId); - if (instance != null) { - instance.setPowerStateUpdateCount(0); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - update(instanceId, instance); - } - - txn.commit(); + public void resetVmPowerStateTracking(final long instanceId) { + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VMInstanceVO instance = findById(instanceId); + if (instance != null) { + instance.setPowerStateUpdateCount(0); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + update(instanceId, instance); + } + } + }); } - @Override - @DB - public void resetHostPowerStateTracking(long hostId) { - SearchCriteria sc = createSearchCriteria(); - sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId); + @Override @DB + public void resetHostPowerStateTracking(final long hostId) { + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + SearchCriteria sc = createSearchCriteria(); + sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId); - VMInstanceVO instance = this.createForUpdate(); - instance.setPowerStateUpdateCount(0); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + VMInstanceVO instance = createForUpdate(); + instance.setPowerStateUpdateCount(0); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - this.update(instance, sc); + update(instance, sc); + } + }); } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java index a2d9a7b..e93bbc2 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java @@ -20,17 +20,24 @@ package org.apache.cloudstack.framework.messagebus; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + public class MessageDispatcher implements MessageSubscriber { - private static Map, Method> s_handlerCache = new HashMap, Method>(); + private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class); + + private static Map, List> s_handlerCache = new HashMap, List>(); private static Map s_targetMap = new HashMap(); private Object _targetObject; public MessageDispatcher(Object targetObject) { _targetObject = targetObject; + buildHandlerMethodCache(targetObject.getClass()); } @Override @@ -67,10 +74,13 @@ public class MessageDispatcher implements MessageSubscriber { try { handler.invoke(target, subject, senderAddress, args); } catch (IllegalArgumentException e) { + s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e); throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject); } catch (IllegalAccessException e) { + s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e); throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject); } catch (InvocationTargetException e) { + s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e); throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject); } @@ -79,18 +89,18 @@ public class MessageDispatcher implements MessageSubscriber { public static Method resolveHandler(Class handlerClz, String subject) { synchronized (s_handlerCache) { - Method handler = s_handlerCache.get(handlerClz); - if (handler != null) - return handler; + List handlerList = s_handlerCache.get(handlerClz); + if (handlerList != null) { + for (Method method : handlerList) { + MessageHandler annotation = method.getAnnotation(MessageHandler.class); + assert (annotation != null); - for (Method method : handlerClz.getMethods()) { - MessageHandler annotation = method.getAnnotation(MessageHandler.class); - if (annotation != null) { if (match(annotation.topic(), subject)) { - s_handlerCache.put(handlerClz, method); return method; } } + } else { + s_logger.error("Handler class " + handlerClz.getName() + " is not registered"); } } @@ -100,4 +110,40 @@ public class MessageDispatcher implements MessageSubscriber { private static boolean match(String expression, String param) { return param.matches(expression); } + + private void buildHandlerMethodCache(Class handlerClz) { + if (s_logger.isInfoEnabled()) + s_logger.info("Build message handler cache for " + handlerClz.getName()); + + synchronized (s_handlerCache) { + List handlerList = s_handlerCache.get(handlerClz); + if (handlerList == null) { + handlerList = new ArrayList(); + s_handlerCache.put(handlerClz, handlerList); + + Class clz = handlerClz; + while (clz != null && clz != Object.class) { + for (Method method : clz.getDeclaredMethods()) { + MessageHandler annotation = method.getAnnotation(MessageHandler.class); + if (annotation != null) { + // allow private member access via reflection + method.setAccessible(true); + handlerList.add(method); + + if (s_logger.isInfoEnabled()) + s_logger.info("Add message handler " + handlerClz.getName() + "." + method.getName() + " to cache"); + } + } + + clz = clz.getSuperclass(); + } + } else { + if (s_logger.isInfoEnabled()) + s_logger.info("Message handler for class " + handlerClz.getName() + " is already in cache"); + } + } + + if (s_logger.isInfoEnabled()) + s_logger.info("Done building message handler cache for " + handlerClz.getName()); + } }