Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0C5C3F739 for ; Mon, 13 May 2013 01:16:02 +0000 (UTC) Received: (qmail 74022 invoked by uid 500); 13 May 2013 01:16:01 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 73977 invoked by uid 500); 13 May 2013 01:16:01 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 73969 invoked by uid 99); 13 May 2013 01:16:01 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 May 2013 01:16:01 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 8C18D88F08C; Mon, 13 May 2013 01:16:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kelveny@apache.org To: commits@cloudstack.apache.org Message-Id: <1ae8d5930b0c49dead45cbb0b3edcb11@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: updated refs/heads/vmsync to c7530db Date: Mon, 13 May 2013 01:16:01 +0000 (UTC) Updated Branches: refs/heads/vmsync 2210c1027 -> c7530dbd7 Hook job monitoring Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/c7530dbd Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/c7530dbd Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/c7530dbd Branch: refs/heads/vmsync Commit: c7530dbd70b256472c834158d4cbcf45d6235c50 Parents: 2210c10 Author: Kelven Yang Authored: Sun May 12 18:15:47 2013 -0700 Committer: Kelven Yang Committed: Sun May 12 18:15:47 2013 -0700 ---------------------------------------------------------------------- server/src/com/cloud/async/AsyncJobConstants.java | 2 ++ .../src/com/cloud/async/AsyncJobManagerImpl.java | 14 ++++++++++---- server/src/com/cloud/async/AsyncJobMonitor.java | 4 +++- .../test/com/cloud/vm/VmWorkTestConfiguration.java | 6 ++++++ 4 files changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobConstants.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java index bbebad6..6081d0c 100644 --- a/server/src/com/cloud/async/AsyncJobConstants.java +++ b/server/src/com/cloud/async/AsyncJobConstants.java @@ -24,6 +24,8 @@ public interface AsyncJobConstants { public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher"; public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread"; + public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor"; + // Although we may have detailed masks for each individual wakeup event, i.e. // periodical timer, matched topic from message bus, it seems that we don't // need to distinguish them to such level. Therefore, only one wakeup signal http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index f23f3d7..cde184a 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -92,6 +92,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AsyncJobJoinMapDao _joinMapDao; @Inject private List _jobDispatchers; @Inject private MessageBus _messageBus; + @Inject private AsyncJobMonitor _jobMonitor; // property private String defaultDispatcher; @@ -524,6 +525,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } + _jobMonitor.registerActiveTask(job.getId()); AsyncJobExecutionContext.setCurrentExecutionContext( (AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) ); @@ -560,6 +562,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } finally { // guard final clause as well try { + AsyncJobVO jobToUpdate = _jobDao.findById(job.getId()); + jobToUpdate.setExecutingMsid(null); + _jobDao.update(job.getId(), jobToUpdate); + if (job.getSyncSource() != null) { _queueMgr.purgeItem(job.getSyncSource().getId()); checkQueue(job.getSyncSource().getQueueId()); @@ -569,6 +575,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // clean execution environment // AsyncJobExecutionContext.setCurrentExecutionContext(null); + _jobMonitor.unregisterActiveTask(job.getId()); try { JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); @@ -608,7 +615,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, job.setSyncSource(item); job.setExecutingMsid(getMsid()); - job.setCompleteMsid(getMsid()); _jobDao.update(job.getId(), job); try { @@ -616,10 +622,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } catch(RejectedExecutionException e) { s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn"); _queueMgr.returnItem(item.getId()); - } finally { + job.setExecutingMsid(null); _jobDao.update(job.getId(), job); - } + } } else { if(s_logger.isDebugEnabled()) { @@ -838,7 +844,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, int poolSize = (cloudMaxActive * 2) / 3; s_logger.info("Start AsyncJobManager thread pool in size " + poolSize); - _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory("Job-Executor")); + _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobConstants.JOB_POOL_THREAD_PREFIX)); } catch (final Exception e) { throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl"); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/src/com/cloud/async/AsyncJobMonitor.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java index bd5b2cd..98c340b 100644 --- a/server/src/com/cloud/async/AsyncJobMonitor.java +++ b/server/src/com/cloud/async/AsyncJobMonitor.java @@ -105,10 +105,12 @@ public class AsyncJobMonitor extends ManagerBase { return true; } - public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) { + public void registerActiveTask(long jobId) { synchronized(this) { assert(_activeTasks.get(jobId) == null); + long threadId = Thread.currentThread().getId(); + boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX); ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread); _activeTasks.put(jobId, record); if(fromPoolThread) http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c7530dbd/server/test/com/cloud/vm/VmWorkTestConfiguration.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java index 67a3c00..93aa41a 100644 --- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java +++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java @@ -21,6 +21,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.cloud.api.ApiDispatcher; +import com.cloud.async.AsyncJobMonitor; import com.cloud.async.SyncQueueManager; import com.cloud.async.SyncQueueManagerImpl; import com.cloud.async.dao.AsyncJobDao; @@ -123,4 +124,9 @@ public class VmWorkTestConfiguration { public VMInstanceDao vmInstanceDao() { return Mockito.mock(VMInstanceDao.class); } + + @Bean + public AsyncJobMonitor jobMonitor() { + return Mockito.mock(AsyncJobMonitor.class); + } }