cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alena1...@apache.org
Subject git commit: updated refs/heads/master to 2ecf9e3
Date Fri, 24 May 2013 22:00:17 GMT
Updated Branches:
  refs/heads/master 6f65a5bbe -> 2ecf9e329


CLOUDSTACK-2680: Async job expunge thread - expunge only:

1) Unfinished jobs that are yet to be processed.
2) Completed jobs

The jobs that are in process, will be skipped by the expunge thread

Conflicts:
	server/src/com/cloud/async/dao/AsyncJobDao.java
	server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
	server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2ecf9e32
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2ecf9e32
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2ecf9e32

Branch: refs/heads/master
Commit: 2ecf9e3293d9b5f1ccffe0736bc8ef0cbf3b1529
Parents: 6f65a5b
Author: Alena Prokharchyk <alena.prokharchyk@citrix.com>
Authored: Fri May 24 14:08:28 2013 -0700
Committer: Alena Prokharchyk <alena.prokharchyk@citrix.com>
Committed: Fri May 24 15:01:12 2013 -0700

----------------------------------------------------------------------
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   17 ++-
 server/src/com/cloud/async/dao/AsyncJobDao.java    |    5 +-
 .../src/com/cloud/async/dao/AsyncJobDaoImpl.java   |   88 +++++++++------
 3 files changed, 70 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 47d793f..0101a8a 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -621,11 +621,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
                     // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs
per minute
                     // hopefully this will be fast enough to balance potential growth of
job table
-                    List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
-                    if(l != null && l.size() > 0) {
-                        for(AsyncJobVO job : l) {
-                            expungeAsyncJob(job);
-                        }
+                    //1) Expire unfinished jobs that weren't processed yet
+                    List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime,
100);
+                    for(AsyncJobVO job : l) {
+                    	s_logger.trace("Expunging unfinished job " + job);
+                        expungeAsyncJob(job);
+                    }       
+                    
+                    //2) Expunge finished jobs
+                    List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime,
100);
+                    for(AsyncJobVO job : completedJobs) {
+                    	s_logger.trace("Expunging completed job " + job);
+                        expungeAsyncJob(job);
                     }
 
                     // forcefully cancel blocking queue items if they've been staying there
for too long

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java b/server/src/com/cloud/async/dao/AsyncJobDao.java
index 9d20759..9ab9b22 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDao.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDao.java
@@ -26,6 +26,7 @@ import com.cloud.utils.db.GenericDao;
 public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
 	AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
 	List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId);
-	List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
+	List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit);
 	void resetJobProcess(long msid, int jobResultCode, String jobResultMessage);
-}
+	List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
index 4793a6e..b2c0d9c 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
@@ -42,17 +42,19 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long>
implements
     private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
 	
 	private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;	
-	private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;	
-	private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;		
-	
-	public AsyncJobDaoImpl() {
-		pendingAsyncJobSearch = createSearchBuilder();
-		pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(),

-			SearchCriteria.Op.EQ);
-		pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(),

-			SearchCriteria.Op.EQ);
-		pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), 
-				SearchCriteria.Op.EQ);
+	private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;	
+	private final SearchBuilder<AsyncJobVO> expiringUnfinishedAsyncJobSearch;
+	private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
+
+	
+	public AsyncJobDaoImpl() {
+		pendingAsyncJobSearch = createSearchBuilder();
+		pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(),

+			SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(),

+			SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), 
+				SearchCriteria.Op.EQ);
 		pendingAsyncJobSearch.done();
 		
 		pendingAsyncJobsSearch = createSearchBuilder();
@@ -64,27 +66,36 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long>
implements
 				SearchCriteria.Op.EQ);
 		pendingAsyncJobsSearch.done();
 		
-		expiringAsyncJobSearch = createSearchBuilder();
-		expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), 
+		expiringUnfinishedAsyncJobSearch = createSearchBuilder();
+		expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(),
 			SearchCriteria.Op.LTEQ);
-		expiringAsyncJobSearch.done();
-	}
-	
-	public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
-        SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
-        sc.setParameters("instanceType", instanceType);
-        sc.setParameters("instanceId", instanceId);
-        sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
-        
-        List<AsyncJobVO> l = listIncludingRemovedBy(sc);
-        if(l != null && l.size() > 0) {
-        	if(l.size() > 1) {
-        		s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending
async-job");
-        	}
-        	
-        	return l.get(0);
-        }
-        return null;
+		expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(),
SearchCriteria.Op.NULL);
+		expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(),
SearchCriteria.Op.EQ);
+		expiringUnfinishedAsyncJobSearch.done();
+		
+		expiringCompletedAsyncJobSearch = createSearchBuilder();
+		expiringCompletedAsyncJobSearch.and("created", expiringCompletedAsyncJobSearch.entity().getCreated(),
+			SearchCriteria.Op.LTEQ);
+		expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(),
SearchCriteria.Op.NNULL);
+		expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(),
SearchCriteria.Op.NEQ);
+		expiringCompletedAsyncJobSearch.done();
+	}
+	
+	public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
+        SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
+        sc.setParameters("instanceType", instanceType);
+        sc.setParameters("instanceId", instanceId);
+        sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
+        
+        List<AsyncJobVO> l = listIncludingRemovedBy(sc);
+        if(l != null && l.size() > 0) {
+        	if(l.size() > 1) {
+        		s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending
async-job");
+        	}
+        	
+        	return l.get(0);
+        }
+        return null;
 	}
 	
 	public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long
accountId) {
@@ -99,9 +110,20 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long>
implements
         return listBy(sc);
 	}
 	
-	public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
-		SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
+	@Override
+	public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
+		SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create();
+		sc.setParameters("created", cutTime);
+		sc.setParameters("jobStatus", 0);
+		Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
+		return listIncludingRemovedBy(sc, filter);
+	}
+	
+	@Override
+	public List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit) {
+		SearchCriteria<AsyncJobVO> sc = expiringCompletedAsyncJobSearch.create();
 		sc.setParameters("created", cutTime);
+		sc.setParameters("jobStatus", 0);
 		Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
 		return listIncludingRemovedBy(sc, filter);
 	}


Mime
View raw message