cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mc...@apache.org
Subject git commit: updated refs/heads/master to 248e4fb
Date Fri, 17 Oct 2014 17:51:11 GMT
Repository: cloudstack
Updated Branches:
  refs/heads/master a1b913db2 -> 248e4fbda


CLOUDSTACK-7749: AsyncJob GC thread cannot purge queue items that have been blocking for too
long if exception is thrown in expunging some unfinished or completed old jobs, this will
make some future jobs stuck.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/248e4fbd
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/248e4fbd
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/248e4fbd

Branch: refs/heads/master
Commit: 248e4fbdacb1372da94cbcf49ca23a14c8c9b514
Parents: a1b913d
Author: Min Chen <min.chen@citrix.com>
Authored: Thu Oct 16 18:15:50 2014 -0700
Committer: Min Chen <min.chen@citrix.com>
Committed: Fri Oct 17 10:43:59 2014 -0700

----------------------------------------------------------------------
 .../jobs/dao/SyncQueueItemDaoImpl.java          |  2 +-
 .../jobs/impl/AsyncJobManagerImpl.java          | 57 ++++++++++++--------
 2 files changed, 35 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/248e4fbd/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 41f1419..167d9f5 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -147,7 +147,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
         SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
         sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
         sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
-        sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
+        sbItem.and("lastProcessTime", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
         sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
 
         sbItem.done();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/248e4fbd/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 7e65ede..04fab24 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -769,46 +769,57 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
             public void reallyRun() {
                 try {
-                    s_logger.trace("Begin cleanup expired async-jobs");
+                    s_logger.info("Begin cleanup expired async-jobs");
 
-                    Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value()
* 60000);
+                    // forcefully cancel blocking queue items if they've been staying there
for too long
+                    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value()
* 60000, false);
+                    if (blockItems != null && blockItems.size() > 0) {
+                        for (SyncQueueItemVO item : blockItems) {
+                            try {
+                                if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType))
{
+                                    s_logger.info("Remove Job-" + item.getContentId() + "
from Queue-" + item.getId() + " since it has been blocked for too long");
+                                    completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED,
0, "Job is cancelled as it has been blocking others for too long");
+
+                                    _jobMonitor.unregisterByJobId(item.getContentId());
+                                }
+
+                                // purge the item and resume queue processing
+                                _queueMgr.purgeItem(item.getId());
+                            } catch (Throwable e) {
+                                s_logger.error("Unexpected exception when trying to remove
job from sync queue, ", e);
+                            }
+                        }
+                    }
 
+                    Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value()
* 60000);
                     // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs
per minute
                     // hopefully this will be fast enough to balance potential growth of
job table
                     // 1) Expire unfinished jobs that weren't processed yet
                     List<AsyncJobVO> unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime,
100);
                     for (AsyncJobVO job : unfinishedJobs) {
-                        s_logger.info("Expunging unfinished job " + job);
+                        try {
+                            s_logger.info("Expunging unfinished job-" + job.getId());
 
-                        _jobMonitor.unregisterByJobId(job.getId());
-                        expungeAsyncJob(job);
+                            _jobMonitor.unregisterByJobId(job.getId());
+                            expungeAsyncJob(job);
+                        } catch (Throwable e) {
+                            s_logger.error("Unexpected exception when trying to expunge job-"
+ job.getId(), e);
+                        }
                     }
 
                     // 2) Expunge finished jobs
                     List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime,
100);
                     for (AsyncJobVO job : completedJobs) {
-                        s_logger.trace("Expunging completed job " + job);
-
-                        expungeAsyncJob(job);
-                    }
-
-                    // forcefully cancel blocking queue items if they've been staying there
for too long
-                    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value()
* 60000, false);
-                    if (blockItems != null && blockItems.size() > 0) {
-                        for (SyncQueueItemVO item : blockItems) {
-                            if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType))
{
-                                s_logger.info("Remove Job-" + item.getContentId() + " from
Queue-" + item.getId() + " since it has been blocked for too long");
-                                completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED,
0, "Job is cancelled as it has been blocking others for too long");
-
-                                _jobMonitor.unregisterByJobId(item.getContentId());
-                            }
+                        try {
+                            s_logger.info("Expunging completed job-" + job.getId());
 
-                            // purge the item and resume queue processing
-                            _queueMgr.purgeItem(item.getId());
+                            expungeAsyncJob(job);
+                        } catch (Throwable e) {
+                            s_logger.error("Unexpected exception when trying to expunge job-"
+ job.getId(), e);
                         }
                     }
 
-                    s_logger.trace("End cleanup expired async-jobs");
+                    s_logger.info("End cleanup expired async-jobs");
                 } catch (Throwable e) {
                     s_logger.error("Unexpected exception when trying to execute queue item,
", e);
                 }


Mime
View raw message