cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: updated refs/heads/4.3 to 7a8c0e3
Date Wed, 15 Jan 2014 01:33:56 GMT
Updated Branches:
  refs/heads/4.3 45065c712 -> 7a8c0e3ae


CLOUDSTACK-5696: Fix sync issue with out-of-band changes


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/7a8c0e3a
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/7a8c0e3a
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/7a8c0e3a

Branch: refs/heads/4.3
Commit: 7a8c0e3ae05fb22894c1ea3b61e77a1b8d8ec32b
Parents: 45065c7
Author: Kelven Yang <kelveny@gmail.com>
Authored: Tue Jan 14 17:32:52 2014 -0800
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Tue Jan 14 17:33:15 2014 -0800

----------------------------------------------------------------------
 .../com/cloud/vm/VirtualMachineManagerImpl.java |  16 ++-
 .../cloud/vm/VirtualMachinePowerStateSync.java  |  10 +-
 .../vm/VirtualMachinePowerStateSyncImpl.java    |  36 ++---
 .../src/com/cloud/vm/dao/VMInstanceDao.java     |   6 +-
 .../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 140 +++++++++++--------
 .../framework/messagebus/MessageDispatcher.java |  97 +++++++++----
 6 files changed, 193 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 17f0936..af65ac6 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -62,6 +62,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
 import org.apache.cloudstack.framework.messagebus.MessageHandler;
 import org.apache.cloudstack.jobs.JobInfo;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
@@ -575,6 +576,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
         _agentMgr.registerForHostEvents(this, true, true, true);
 
+        if (VmJobEnabled.value()) {
+            _messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this));
+        }
+
         return true;
     }
 
@@ -3815,7 +3820,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
     //
 
     @MessageHandler(topic = Topics.VM_POWER_STATE)
-    private void HandlePownerStateReport(Object target, String subject, String senderAddress,
Object args) {
+    private void HandlePowerStateReport(String subject, String senderAddress, Object args)
{
     	assert(args != null);
     	Long vmId = (Long)args;
 
@@ -3835,7 +3840,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
     				break;
 
     			// PowerUnknown shouldn't be reported, it is a derived
-    			// VM power state from host state (host un-reachable
+                // VM power state from host state (host un-reachable)
     			case PowerUnknown :
     			default :
     				assert(false);
@@ -3845,8 +3850,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
     			s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
     		}
     	} else {
-    		// TODO, do job wake-up signalling, since currently async job wake-up is not in use
-    		// we will skip it for nows
+            // reset VM power state tracking so that we won't lost signal when VM has
+            // been translated to
+            _vmDao.resetVmPowerStateTracking(vmId);
     	}
     }
 
@@ -3921,6 +3927,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
     	switch(vm.getState()) {
     	case Starting :
     	case Stopping :
+        case Running:
     	case Stopped :
     	case Migrating :
     		try {
@@ -3933,7 +3940,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
       		// TODO: we need to forcely release all resource allocation
           	break;
 
-    	case Running :
     	case Destroyed :
     	case Expunging :
     		break;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
index dacc8d0..d7aef72 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
@@ -5,7 +5,7 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
@@ -21,11 +21,13 @@ import java.util.Map;
 import com.cloud.agent.api.HostVmStateReportEntry;
 
 public interface VirtualMachinePowerStateSync {
-	
+
 	void resetHostSyncState(long hostId);
-	
+
 	void processHostVmStateReport(long hostId, Map<String, HostVmStateReportEntry> report);
-	
+
 	// to adapt legacy ping report
 	void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry>
report);
+
+    Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry>
states);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
index 9aa9501..9a0119c 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java
@@ -35,22 +35,22 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
     @Inject MessageBus _messageBus;
     @Inject VMInstanceDao _instanceDao;
     @Inject VirtualMachineManager _vmMgr;
-    
+
     public VirtualMachinePowerStateSyncImpl() {
     }
-    
+
     @Override
 	public void resetHostSyncState(long hostId) {
     	s_logger.info("Reset VM power state sync for host: " + hostId);
     	_instanceDao.resetHostPowerStateTracking(hostId);
     }
-    
+
     @Override
 	public void processHostVmStateReport(long hostId, Map<String, HostVmStateReportEntry>
report) {
     	if(s_logger.isDebugEnabled())
     		s_logger.debug("Process host VM state report from ping process. host: " + hostId);
-    	
-    	Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+
+    	Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
     	processReport(hostId, translatedInfo);
     }
 
@@ -58,39 +58,41 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
 	public void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry>
report) {
     	if(s_logger.isDebugEnabled())
     		s_logger.debug("Process host VM state report from ping process. host: " + hostId);
-    	
-    	Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report);
+
+    	Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report);
     	processReport(hostId, translatedInfo);
     }
-    
+
     private void processReport(long hostId, Map<Long, VirtualMachine.PowerState> translatedInfo)
{
-    	
+
     	for(Map.Entry<Long, VirtualMachine.PowerState> entry : translatedInfo.entrySet())
{
-    		
+
         	if(s_logger.isDebugEnabled())
         		s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey()
+ ", power state: " + entry.getValue());
 
     		if(_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) {
-    			
             	if(s_logger.isDebugEnabled())
             		s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: "
+ entry.getKey() + ", power state: " + entry.getValue());
-    			
+
                 _messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL,
entry.getKey());
+            } else {
+                if (s_logger.isDebugEnabled())
+                    s_logger.debug("VM power state does not change, skip DB writing. vm id:
" + entry.getKey());
     		}
     	}
     }
-    	    
-    private Map<Long, VirtualMachine.PowerState> convertToInfos(Map<String, HostVmStateReportEntry>
states) {
+
+    @Override
+    public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String,
HostVmStateReportEntry> states) {
         final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>();
         if (states == null) {
             return map;
         }
-        
+
         for (Map.Entry<String, HostVmStateReportEntry> entry : states.entrySet()) {
         	VMInstanceVO vm = findVM(entry.getKey());
         	if(vm != null) {
         		map.put(vm.getId(), entry.getValue().getState());
-        		break;
         	} else {
         		s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey());
         	}
@@ -98,7 +100,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat
 
         return map;
     }
-    
+
     private VMInstanceVO findVM(String vmName) {
         return _instanceDao.findVMByInstanceName(vmName);
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
index 830dea8..78c6e8c 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java
@@ -70,6 +70,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>,
StateDao<
 
     List<VMInstanceVO> findVMInTransition(Date time, State... states);
 
+    List<VMInstanceVO> listByHostAndState(long hostId, State... states);
+
     List<VMInstanceVO> listByTypes(VirtualMachine.Type... types);
 
     VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types);
@@ -123,8 +125,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>,
StateDao<
     List<VMInstanceVO> listStartingWithNoHostId();
 
     boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState
powerState);
-    
+
     void resetVmPowerStateTracking(long instanceId);
-    
+
     void resetHostPowerStateTracking(long hostId);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
index e7f907e..cc747bc 100644
--- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
+++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java
@@ -49,7 +49,11 @@ import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Func;
 import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.db.UpdateBuilder;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.NicVO;
@@ -65,7 +69,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long>
implem
 
     public static final Logger s_logger = Logger.getLogger(VMInstanceDaoImpl.class);
     private static final int MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT = 3;
-    
+
     protected SearchBuilder<VMInstanceVO> VMClusterSearch;
     protected SearchBuilder<VMInstanceVO> LHVMClusterSearch;
     protected SearchBuilder<VMInstanceVO> IdStatesSearch;
@@ -77,6 +81,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long>
implem
     protected SearchBuilder<VMInstanceVO> TypesSearch;
     protected SearchBuilder<VMInstanceVO> IdTypesSearch;
     protected SearchBuilder<VMInstanceVO> HostIdTypesSearch;
+    protected SearchBuilder<VMInstanceVO> HostIdStatesSearch;
     protected SearchBuilder<VMInstanceVO> HostIdUpTypesSearch;
     protected SearchBuilder<VMInstanceVO> HostUpSearch;
     protected SearchBuilder<VMInstanceVO> InstanceNameSearch;
@@ -180,6 +185,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO,
Long> implem
         HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN);
         HostIdTypesSearch.done();
 
+        HostIdStatesSearch = createSearchBuilder();
+        HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ);
+        HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN);
+        HostIdStatesSearch.done();
+
         HostIdUpTypesSearch = createSearchBuilder();
         HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ);
         HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN);
@@ -230,7 +240,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO,
Long> implem
 
         _updateTimeAttr = _allAttributes.get("updateTime");
         assert _updateTimeAttr != null : "Couldn't get this updateTime attribute";
-        
+
         SearchBuilder<NicVO> nicSearch = _nicDao.createSearchBuilder();
         nicSearch.and("networkId", nicSearch.entity().getNetworkId(), SearchCriteria.Op.EQ);
 
@@ -242,7 +252,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO,
Long> implem
         DistinctHostNameSearch.join("nicSearch", nicSearch, DistinctHostNameSearch.entity().getId(),
                 nicSearch.entity().getInstanceId(), JoinBuilder.JoinType.INNER);
         DistinctHostNameSearch.done();
-        
+
     }
 
     @Override
@@ -335,6 +345,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO,
Long> implem
     }
 
     @Override
+    public List<VMInstanceVO> listByHostAndState(long hostId, State... states) {
+        SearchCriteria<VMInstanceVO> sc = HostIdStatesSearch.create();
+        sc.setParameters("hostId", hostId);
+        sc.setParameters("states", (Object[])states);
+
+        return listBy(sc);
+    }
+
+    @Override
     public List<VMInstanceVO> listUpByHostIdTypes(long hostid, Type... types) {
         SearchCriteria<VMInstanceVO> sc = HostIdUpTypesSearch.create();
         sc.setParameters("hostid", hostid);
@@ -679,63 +698,68 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO,
Long> implem
         sc.setParameters("state", State.Starting);
         return listBy(sc);
     }
-    
-    @Override
-    public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState
powerState) {
-    	boolean needToUpdate = false;
-    	TransactionLegacy txn = TransactionLegacy.currentTxn();
-    	txn.start();
-         
-        VMInstanceVO instance = findById(instanceId);
-       if(instance != null) {
-           Long savedPowerHostId = instance.getPowerHostId();
-           if(instance.getPowerState() != powerState || savedPowerHostId == null 
-        		   || savedPowerHostId.longValue() != powerHostId) {
-               instance.setPowerState(powerState);
-               instance.setPowerHostId(powerHostId);
-               instance.setPowerStateUpdateCount(1);
-               instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-               needToUpdate = true;
-               update(instanceId, instance);
-           } else {
-               // to reduce DB updates, consecutive same state update for more than 3 times
-               if(instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT)
{
-	               instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1);
-	               instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-	               needToUpdate = true;
-	               update(instanceId, instance);
-               }
-           }
-        }
-        
-        txn.commit();
-        return needToUpdate;
-    }
-    
+
     @Override
-    public void resetVmPowerStateTracking(long instanceId) {
-    	TransactionLegacy txn = TransactionLegacy.currentTxn();
-        txn.start();
-        VMInstanceVO instance = findById(instanceId);
-        if(instance != null) {
-               instance.setPowerStateUpdateCount(0);
-               instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-               update(instanceId, instance);
-        }
-        
-        txn.commit();
+    public boolean updatePowerState(final long instanceId, final long powerHostId, final
VirtualMachine.PowerState powerState) {
+        return Transaction.execute(new TransactionCallback<Boolean>() {
+    	    @Override
+            public Boolean doInTransaction(TransactionStatus status) {
+    	        boolean needToUpdate = false;
+    	        VMInstanceVO instance = findById(instanceId);
+    	        if(instance != null) {
+    	            Long savedPowerHostId = instance.getPowerHostId();
+    	            if(instance.getPowerState() != powerState || savedPowerHostId == null
+    	                    || savedPowerHostId.longValue() != powerHostId) {
+    	                instance.setPowerState(powerState);
+    	                instance.setPowerHostId(powerHostId);
+    	                instance.setPowerStateUpdateCount(1);
+    	                instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+    	                needToUpdate = true;
+    	                update(instanceId, instance);
+    	            } else {
+    	                // to reduce DB updates, consecutive same state update for more than
3 times
+    	                if(instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT)
{
+    	                    instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount()
+ 1);
+    	                    instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+    	                    needToUpdate = true;
+    	                    update(instanceId, instance);
+    	                }
+    	            }
+    	        }
+    	        return needToUpdate;
+    	    }
+        });
+    }
+
+    @Override
+    public void resetVmPowerStateTracking(final long instanceId) {
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VMInstanceVO instance = findById(instanceId);
+                if (instance != null) {
+                    instance.setPowerStateUpdateCount(0);
+                    instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+                    update(instanceId, instance);
+                }
+            }
+        });
     }
-    
-    
+
     @Override @DB
-    public void resetHostPowerStateTracking(long hostId) {
-       SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
-       sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
-       
-       VMInstanceVO instance = this.createForUpdate();
-       instance.setPowerStateUpdateCount(0);
-       instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
-       
-       this.update(instance, sc);
+    public void resetHostPowerStateTracking(final long hostId) {
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                SearchCriteria<VMInstanceVO> sc = createSearchCriteria();
+                sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId);
+
+                VMInstanceVO instance = createForUpdate();
+                instance.setPowerStateUpdateCount(0);
+                instance.setPowerStateUpdateTime(DateUtil.currentGMTTime());
+
+                update(instance, sc);
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7a8c0e3a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
index ac75afb..e83c5ee 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java
@@ -20,25 +20,31 @@ package org.apache.cloudstack.framework.messagebus;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
 
 public class MessageDispatcher implements MessageSubscriber {
-	private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>,
Method>();
-	
+    private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class);
+
+    private static Map<Class<?>, List<Method>> s_handlerCache = new HashMap<Class<?>,
List<Method>>();
+
 	private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object,
MessageDispatcher>();
 	private Object _targetObject;
-	
+
 	public MessageDispatcher(Object targetObject) {
 		_targetObject = targetObject;
+        buildHandlerMethodCache(targetObject.getClass());
 	}
-	
+
 	@Override
 	public void onPublishMessage(String senderAddress, String subject, Object args) {
 		dispatch(_targetObject, subject, senderAddress, args);
 	}
-	
+
 	public static MessageDispatcher getDispatcher(Object targetObject) {
 		MessageDispatcher dispatcher;
 		synchronized(s_targetMap) {
@@ -50,55 +56,94 @@ public class MessageDispatcher implements MessageSubscriber {
 		}
 		return dispatcher;
 	}
-	
+
 	public static void removeDispatcher(Object targetObject) {
 		synchronized(s_targetMap) {
 			s_targetMap.remove(targetObject);
 		}
 	}
-	
+
 	public static boolean dispatch(Object target, String subject, String senderAddress, Object
args) {
 		assert(subject != null);
 		assert(target != null);
-		
+
 		Method handler = resolveHandler(target.getClass(), subject);
 		if(handler == null)
 			return false;
-		
+
 		try {
 			handler.invoke(target, subject, senderAddress, args);
 		} catch (IllegalArgumentException e) {
+            s_logger.error("Unexpected exception when calling " + target.getClass().getName()
+ "." + handler.getName(), e);
 			throw new RuntimeException("IllegalArgumentException when invoking event handler for subject:
" + subject);
 		} catch (IllegalAccessException e) {
+            s_logger.error("Unexpected exception when calling " + target.getClass().getName()
+ "." + handler.getName(), e);
 			throw new RuntimeException("IllegalAccessException when invoking event handler for subject:
" + subject);
 		} catch (InvocationTargetException e) {
+            s_logger.error("Unexpected exception when calling " + target.getClass().getName()
+ "." + handler.getName(), e);
 			throw new RuntimeException("InvocationTargetException when invoking event handler for
subject: " + subject);
 		}
-		
+
 		return true;
 	}
-	
+
 	public static Method resolveHandler(Class<?> handlerClz, String subject) {
 		synchronized(s_handlerCache) {
-			Method handler = s_handlerCache.get(handlerClz);
-			if(handler != null)
-				return handler;
-			
-			for(Method method : handlerClz.getMethods()) {
-				MessageHandler annotation = method.getAnnotation(MessageHandler.class);
-				if(annotation != null) {
-					if(match(annotation.topic(), subject)) {
-						s_handlerCache.put(handlerClz, method);
-						return method;
-					}
-				}
-			}
+            List<Method> handlerList = s_handlerCache.get(handlerClz);
+            if (handlerList != null) {
+                for (Method method : handlerList) {
+                    MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+                    assert (annotation != null);
+
+                    if (match(annotation.topic(), subject)) {
+                        return method;
+                    }
+                }
+            } else {
+                s_logger.error("Handler class " + handlerClz.getName() + " is not registered");
+            }
 		}
-		
+
 		return null;
 	}
-	
+
 	private static boolean match(String expression, String param) {
 		return param.matches(expression);
 	}
+
+    private void buildHandlerMethodCache(Class<?> handlerClz) {
+        if (s_logger.isInfoEnabled())
+            s_logger.info("Build message handler cache for " + handlerClz.getName());
+
+        synchronized (s_handlerCache) {
+            List<Method> handlerList = s_handlerCache.get(handlerClz);
+            if (handlerList == null) {
+                handlerList = new ArrayList<Method>();
+                s_handlerCache.put(handlerClz, handlerList);
+
+                Class<?> clz = handlerClz;
+                while (clz != null && clz != Object.class) {
+                    for (Method method : clz.getDeclaredMethods()) {
+                        MessageHandler annotation = method.getAnnotation(MessageHandler.class);
+                        if (annotation != null) {
+                            // allow private member access via reflection
+                            method.setAccessible(true);
+                            handlerList.add(method);
+
+                            if (s_logger.isInfoEnabled())
+                                s_logger.info("Add message handler " + handlerClz.getName()
+ "." + method.getName() + " to cache");
+                        }
+                    }
+
+                    clz = clz.getSuperclass();
+                }
+            } else {
+                if (s_logger.isInfoEnabled())
+                    s_logger.info("Message handler for class " + handlerClz.getName() + "
is already in cache");
+            }
+        }
+
+        if (s_logger.isInfoEnabled())
+            s_logger.info("Done building message handler cache for " + handlerClz.getName());
+    }
 }


Mime
View raw message