cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: updated refs/heads/vmsync to c7530db
Date Mon, 13 May 2013 01:16:01 GMT
Updated Branches:
  refs/heads/vmsync 2210c1027 -> c7530dbd7


Hook job monitoring


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/c7530dbd
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/c7530dbd
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/c7530dbd

Branch: refs/heads/vmsync
Commit: c7530dbd70b256472c834158d4cbcf45d6235c50
Parents: 2210c10
Author: Kelven Yang <kelveny@gmail.com>
Authored: Sun May 12 18:15:47 2013 -0700
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Sun May 12 18:15:47 2013 -0700

----------------------------------------------------------------------
 server/src/com/cloud/async/AsyncJobConstants.java  |    2 ++
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   14 ++++++++++----
 server/src/com/cloud/async/AsyncJobMonitor.java    |    4 +++-
 .../test/com/cloud/vm/VmWorkTestConfiguration.java |    6 ++++++
 4 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobConstants.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java
index bbebad6..6081d0c 100644
--- a/server/src/com/cloud/async/AsyncJobConstants.java
+++ b/server/src/com/cloud/async/AsyncJobConstants.java
@@ -24,6 +24,8 @@ public interface AsyncJobConstants {
 	public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
 	public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
 	
+	public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
+	
 	// Although we may have detailed masks for each individual wakeup event, i.e.
 	// periodical timer, matched topic from message bus, it seems that we don't
 	// need to distinguish them to such level. Therefore, only one wakeup signal

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index f23f3d7..cde184a 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -92,6 +92,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     @Inject private AsyncJobJoinMapDao _joinMapDao;
     @Inject private List<AsyncJobDispatcher> _jobDispatchers;
     @Inject private MessageBus _messageBus;
+    @Inject private AsyncJobMonitor _jobMonitor;
 
     // property
     private String defaultDispatcher;
@@ -524,6 +525,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                         s_logger.warn("Unable to register active job " + job.getId() + "
to JMX monitoring due to exception " + ExceptionUtil.toString(e));
                     }
                     
+                    _jobMonitor.registerActiveTask(job.getId());
                     AsyncJobExecutionContext.setCurrentExecutionContext(
                     	(AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job))
                     );
@@ -560,6 +562,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             	} finally {
             		// guard final clause as well
                     try {
+                    	AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
+                    	jobToUpdate.setExecutingMsid(null);
+                    	_jobDao.update(job.getId(), jobToUpdate);
+                    	
                     	if (job.getSyncSource() != null) {
                             _queueMgr.purgeItem(job.getSyncSource().getId());
                             checkQueue(job.getSyncSource().getQueueId());
@@ -569,6 +575,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     	// clean execution environment
                     	//
                         AsyncJobExecutionContext.setCurrentExecutionContext(null);
+                        _jobMonitor.unregisterActiveTask(job.getId());
                    	
                     	try {
                     		JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
@@ -608,7 +615,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             job.setSyncSource(item);
             
             job.setExecutingMsid(getMsid());
-            job.setCompleteMsid(getMsid());
             _jobDao.update(job.getId(), job);
 
             try {
@@ -616,10 +622,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             } catch(RejectedExecutionException e) {
                 s_logger.warn("Execution for job-" + job.getId() + " is rejected, return
it to the queue for next turn");
                 _queueMgr.returnItem(item.getId());
-            } finally {
+                
             	job.setExecutingMsid(null);
             	_jobDao.update(job.getId(), job);
-            }
+            } 
 
         } else {
             if(s_logger.isDebugEnabled()) {
@@ -838,7 +844,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             int poolSize = (cloudMaxActive * 2) / 3;
 
             s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
-            _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor"));
+            _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX));
         } catch (final Exception e) {
             throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java
index bd5b2cd..98c340b 100644
--- a/server/src/com/cloud/async/AsyncJobMonitor.java
+++ b/server/src/com/cloud/async/AsyncJobMonitor.java
@@ -105,10 +105,12 @@ public class AsyncJobMonitor extends ManagerBase {
 		return true;
 	}
 	
-	public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) {
+	public void registerActiveTask(long jobId) {
 		synchronized(this) {
 			assert(_activeTasks.get(jobId) == null);
 			
+			long threadId = Thread.currentThread().getId();
+			boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
 			ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
 			_activeTasks.put(jobId, record);
 			if(fromPoolThread)

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/test/com/cloud/vm/VmWorkTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
index 67a3c00..93aa41a 100644
--- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java
+++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
@@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import com.cloud.api.ApiDispatcher;
+import com.cloud.async.AsyncJobMonitor;
 import com.cloud.async.SyncQueueManager;
 import com.cloud.async.SyncQueueManagerImpl;
 import com.cloud.async.dao.AsyncJobDao;
@@ -123,4 +124,9 @@ public class VmWorkTestConfiguration {
 	public VMInstanceDao vmInstanceDao() {
 		return Mockito.mock(VMInstanceDao.class);
 	}
+	
+	@Bean
+	public AsyncJobMonitor jobMonitor() {
+		return Mockito.mock(AsyncJobMonitor.class);
+	}
 }


Mime
View raw message