cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: updated refs/heads/vmsync to 39dd4bd
Date Wed, 12 Jun 2013 21:36:37 GMT
Updated Branches:
  refs/heads/vmsync e440bc742 -> 39dd4bd26


Manage active job in run numbers


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/39dd4bd2
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/39dd4bd2
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/39dd4bd2

Branch: refs/heads/vmsync
Commit: 39dd4bd26bda909050ba2cf62579cd9c0c6c239b
Parents: e440bc7
Author: Kelven Yang <kelveny@gmail.com>
Authored: Wed Jun 12 14:29:09 2013 -0700
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Wed Jun 12 14:29:44 2013 -0700

----------------------------------------------------------------------
 .../framework/jobs/impl/AsyncJobMonitor.java    | 20 +++-----
 .../com/cloud/async/AsyncJobManagerImpl.java    | 48 ++++++++++++--------
 .../com/cloud/async/TestAsyncJobManager.java    |  2 +-
 3 files changed, 38 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/39dd4bd2/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
index ea7af83..cd10643 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -107,7 +107,7 @@ public class AsyncJobMonitor extends ManagerBase {
 		return true;
 	}
 	
-	public void registerActiveTask(long jobId) {
+	public void registerActiveTask(long runNumber, long jobId) {
 		synchronized(this) {
 			s_logger.info("Add job-" + jobId + " into job monitoring");
 			
@@ -116,7 +116,7 @@ public class AsyncJobMonitor extends ManagerBase {
 			long threadId = Thread.currentThread().getId();
 			boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
             ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread);
-			_activeTasks.put(jobId, record);
+			_activeTasks.put(runNumber, record);
 			if(fromPoolThread)
 				_activePoolThreads++;
 			else
@@ -124,29 +124,23 @@ public class AsyncJobMonitor extends ManagerBase {
 		}
 	}
 	
-	public void unregisterActiveTask(long jobId) {
+	public void unregisterActiveTask(long runNumber) {
 		synchronized(this) {
-			s_logger.info("Remove job-" + jobId + " from job monitoring");
-
-			ActiveTaskRecord record = _activeTasks.get(jobId);
+			ActiveTaskRecord record = _activeTasks.get(runNumber);
 			assert(record != null);
 			if(record != null) {
+				s_logger.info("Remove job-" + record.getJobId() + " from job monitoring");
+				
 				if(record.isPoolThread())
 					_activePoolThreads--;
 				else
 					_activeInplaceThreads--;
 				
-				_activeTasks.remove(jobId);
+				_activeTasks.remove(runNumber);
 			}
 		}
 	}
 	
-	public boolean isJobActive(long jobId) {
-		synchronized(this) {
-			return _activeTasks.get(jobId) != null;
-		}
-	}
-	
 	public int getActivePoolThreads() {
 		return _activePoolThreads;
 	}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/39dd4bd2/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 81350c4..d65347c 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -109,6 +109,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
 	private long _jobExpireSeconds = 86400;						// 1 day
     private long _jobCancelThresholdSeconds = 3600;         	// 1 hour (for cancelling the
jobs blocking other jobs)
+    
+    private volatile long _executionRunNumber = 1;
 
     private final ScheduledExecutorService _heartbeatScheduler =
             Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
@@ -507,11 +509,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     	return null;
     }
     
+    private long getJobRunNumber() {
+    	synchronized(this) {
+    		return this._executionRunNumber++;
+    	}
+    }
+    
     private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJob job) {
         return new Runnable() {
             @Override
             public void run() {
             	Transaction txn = null;
+            	long runNumber = getJobRunNumber();
+            	
             	try {
             		//
             		// setup execution environment
@@ -521,10 +531,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     try {
                         JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(),
new AsyncJobMBeanImpl(job));
                     } catch(Exception e) {
-                        s_logger.warn("Unable to register active job " + job.getId() + "
to JMX monitoring due to exception " + ExceptionUtil.toString(e));
+                		// Due to co-existence of normal-dispatched-job/wakeup-dispatched-job,
MBean register() call
+                		// is expected to fail under situations
+                    	if(s_logger.isTraceEnabled())
+                    		s_logger.trace("Unable to register active job " + job.getId() + " to
JMX monitoring due to exception " + ExceptionUtil.toString(e));
                     }
                     
-                    _jobMonitor.registerActiveTask(job.getId());
+                    _jobMonitor.registerActiveTask(runNumber, job.getId());
                     AsyncJobExecutionContext.setCurrentExecutionContext(
                     	(AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job))
                     );
@@ -535,17 +548,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     }
 
                     if((getAndResetPendingSignals(job) & AsyncJobConstants.SIGNAL_MASK_WAKEUP)
!= 0) {
-                    	if(!_jobMonitor.isJobActive(job.getId())) {
-	                    	AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
-	                    	if(jobDispatcher != null) {
-	                    		jobDispatcher.runJob(job);
-	                    	} else {
-	                    		s_logger.error("Unable to find a wakeup dispatcher from the joined
job. job-" + job.getId());
-	                    	}
+                    	AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
+                    	if(jobDispatcher != null) {
+                    		jobDispatcher.runJob(job);
+                    	} else {
+                    		s_logger.error("Unable to find a wakeup dispatcher from the joined
job. job-" + job.getId());
                     	}
                     } else {
-                    	assert(_jobMonitor.isJobActive(job.getId()));
-                    	
 	                    AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
 	                    if(jobDispatcher != null) {
 	                    	jobDispatcher.runJob(job);
@@ -574,21 +583,24 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                             checkQueue(job.getSyncSource().getQueueId());
                         }
 
-                    	//
-                    	// clean execution environment
-                    	//
-                        AsyncJobExecutionContext.setCurrentExecutionContext(null);
-                        _jobMonitor.unregisterActiveTask(job.getId());
-                   	
                     	try {
                     		JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
                     	} catch(Exception e) {
-                            s_logger.warn("Unable to unregister job " + job.getId() + " to
JMX monitoring due to exception " + ExceptionUtil.toString(e));
+                    		// Due to co-existence of normal-dispatched-job/wakeup-dispatched-job,
MBean unregister() call
+                    		// is expected to fail under situations
+                    		if(s_logger.isTraceEnabled())
+                    			s_logger.trace("Unable to unregister job " + job.getId() + " to JMX
monitoring due to exception " + ExceptionUtil.toString(e));
                     	}
                     	
 	                    if(txn != null)
 	                    	txn.close();
 	                    
+                    	//
+                    	// clean execution environment
+                    	//
+                        AsyncJobExecutionContext.setCurrentExecutionContext(null);
+                        _jobMonitor.unregisterActiveTask(runNumber);
+	                    
                     } catch(Throwable e) {
                 		s_logger.error("Double exception", e);
                     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/39dd4bd2/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java
index d085777..74c22f8 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -217,7 +217,7 @@ public class TestAsyncJobManager extends TestCase {
 		});
 		thread.start();
     
-		jobMonitor.registerActiveTask(1);
+		jobMonitor.registerActiveTask(1, 1);
 		
     	asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() {
     		public boolean checkCondition() {


Mime
View raw message