Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D070DD103 for ; Fri, 24 May 2013 22:00:17 +0000 (UTC) Received: (qmail 72482 invoked by uid 500); 24 May 2013 22:00:17 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 72452 invoked by uid 500); 24 May 2013 22:00:17 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 72445 invoked by uid 99); 24 May 2013 22:00:17 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 May 2013 22:00:17 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2E434ED50; Fri, 24 May 2013 22:00:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: alena1108@apache.org To: commits@cloudstack.apache.org Message-Id: <611326e07d43417380fddef0e5df4ece@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: updated refs/heads/master to 2ecf9e3 Date: Fri, 24 May 2013 22:00:17 +0000 (UTC) Updated Branches: refs/heads/master 6f65a5bbe -> 2ecf9e329 CLOUDSTACK-2680: Async job expunge thread - expunge only: 1) Unfinished jobs that are yet to be processed. 2) Completed jobs The jobs that are in process, will be skipped by the expunge thread Conflicts: server/src/com/cloud/async/dao/AsyncJobDao.java server/src/com/cloud/async/dao/AsyncJobDaoImpl.java server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2ecf9e32 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2ecf9e32 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2ecf9e32 Branch: refs/heads/master Commit: 2ecf9e3293d9b5f1ccffe0736bc8ef0cbf3b1529 Parents: 6f65a5b Author: Alena Prokharchyk Authored: Fri May 24 14:08:28 2013 -0700 Committer: Alena Prokharchyk Committed: Fri May 24 15:01:12 2013 -0700 ---------------------------------------------------------------------- .../src/com/cloud/async/AsyncJobManagerImpl.java | 17 ++- server/src/com/cloud/async/dao/AsyncJobDao.java | 5 +- .../src/com/cloud/async/dao/AsyncJobDaoImpl.java | 88 +++++++++------ 3 files changed, 70 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 47d793f..0101a8a 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -621,11 +621,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute // hopefully this will be fast enough to balance potential growth of job table - List l = _jobDao.getExpiredJobs(cutTime, 100); - if(l != null && l.size() > 0) { - for(AsyncJobVO job : l) { - expungeAsyncJob(job); - } + //1) Expire unfinished jobs that weren't processed yet + List l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); + for(AsyncJobVO job : l) { + s_logger.trace("Expunging unfinished job " + job); + expungeAsyncJob(job); + } + + //2) Expunge finished jobs + List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); + for(AsyncJobVO job : completedJobs) { + s_logger.trace("Expunging completed job " + job); + expungeAsyncJob(job); } // forcefully cancel blocking queue items if they've been staying there for too long http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDao.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java b/server/src/com/cloud/async/dao/AsyncJobDao.java index 9d20759..9ab9b22 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDao.java +++ b/server/src/com/cloud/async/dao/AsyncJobDao.java @@ -26,6 +26,7 @@ import com.cloud.utils.db.GenericDao; public interface AsyncJobDao extends GenericDao { AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId); List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId); - List getExpiredJobs(Date cutTime, int limit); + List getExpiredUnfinishedJobs(Date cutTime, int limit); void resetJobProcess(long msid, int jobResultCode, String jobResultMessage); -} + List getExpiredCompletedJobs(Date cutTime, int limit); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java index 4793a6e..b2c0d9c 100644 --- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java +++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java @@ -42,17 +42,19 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName()); private final SearchBuilder pendingAsyncJobSearch; - private final SearchBuilder pendingAsyncJobsSearch; - private final SearchBuilder expiringAsyncJobSearch; - - public AsyncJobDaoImpl() { - pendingAsyncJobSearch = createSearchBuilder(); - pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), - SearchCriteria.Op.EQ); - pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), - SearchCriteria.Op.EQ); - pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), - SearchCriteria.Op.EQ); + private final SearchBuilder pendingAsyncJobsSearch; + private final SearchBuilder expiringUnfinishedAsyncJobSearch; + private final SearchBuilder expiringCompletedAsyncJobSearch; + + + public AsyncJobDaoImpl() { + pendingAsyncJobSearch = createSearchBuilder(); + pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), + SearchCriteria.Op.EQ); pendingAsyncJobSearch.done(); pendingAsyncJobsSearch = createSearchBuilder(); @@ -64,27 +66,36 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements SearchCriteria.Op.EQ); pendingAsyncJobsSearch.done(); - expiringAsyncJobSearch = createSearchBuilder(); - expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), + expiringUnfinishedAsyncJobSearch = createSearchBuilder(); + expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ); - expiringAsyncJobSearch.done(); - } - - public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { - SearchCriteria sc = pendingAsyncJobSearch.create(); - sc.setParameters("instanceType", instanceType); - sc.setParameters("instanceId", instanceId); - sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS); - - List l = listIncludingRemovedBy(sc); - if(l != null && l.size() > 0) { - if(l.size() > 1) { - s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job"); - } - - return l.get(0); - } - return null; + expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL); + expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ); + expiringUnfinishedAsyncJobSearch.done(); + + expiringCompletedAsyncJobSearch = createSearchBuilder(); + expiringCompletedAsyncJobSearch.and("created", expiringCompletedAsyncJobSearch.entity().getCreated(), + SearchCriteria.Op.LTEQ); + expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NNULL); + expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ); + expiringCompletedAsyncJobSearch.done(); + } + + public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { + SearchCriteria sc = pendingAsyncJobSearch.create(); + sc.setParameters("instanceType", instanceType); + sc.setParameters("instanceId", instanceId); + sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS); + + List l = listIncludingRemovedBy(sc); + if(l != null && l.size() > 0) { + if(l.size() > 1) { + s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job"); + } + + return l.get(0); + } + return null; } public List findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) { @@ -99,9 +110,20 @@ public class AsyncJobDaoImpl extends GenericDaoBase implements return listBy(sc); } - public List getExpiredJobs(Date cutTime, int limit) { - SearchCriteria sc = expiringAsyncJobSearch.create(); + @Override + public List getExpiredUnfinishedJobs(Date cutTime, int limit) { + SearchCriteria sc = expiringUnfinishedAsyncJobSearch.create(); + sc.setParameters("created", cutTime); + sc.setParameters("jobStatus", 0); + Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); + return listIncludingRemovedBy(sc, filter); + } + + @Override + public List getExpiredCompletedJobs(Date cutTime, int limit) { + SearchCriteria sc = expiringCompletedAsyncJobSearch.create(); sc.setParameters("created", cutTime); + sc.setParameters("jobStatus", 0); Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); return listIncludingRemovedBy(sc, filter); }