cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: updated refs/heads/vmsync to 2210c10
Date Sun, 12 May 2013 23:46:15 GMT
Updated Branches:
  refs/heads/vmsync 2320b1c2c -> 2210c1027


cleanup, bug-fixes for job wakeup process


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2210c102
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2210c102
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2210c102

Branch: refs/heads/vmsync
Commit: 2210c102713a3a56404e0e09b2a6b590740a8401
Parents: 2320b1c
Author: Kelven Yang <kelveny@gmail.com>
Authored: Sun May 12 16:46:01 2013 -0700
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Sun May 12 16:46:01 2013 -0700

----------------------------------------------------------------------
 .../com/cloud/async/AsyncJobExecutionContext.java  |    4 +
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   25 ++++-
 .../src/com/cloud/async/SyncQueueManagerImpl.java  |   13 ++-
 server/src/com/cloud/async/dao/AsyncJobDao.java    |    1 +
 .../src/com/cloud/async/dao/AsyncJobDaoImpl.java   |   11 ++
 .../com/cloud/async/dao/AsyncJobJoinMapDao.java    |    4 +-
 .../cloud/async/dao/AsyncJobJoinMapDaoImpl.java    |   87 +++++++++++++-
 .../com/cloud/async/dao/SyncQueueItemDaoImpl.java  |    2 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java    |    9 ++
 server/src/com/cloud/vm/VmWorkJobDao.java          |    5 +-
 server/src/com/cloud/vm/VmWorkJobDaoImpl.java      |   16 +++
 server/src/com/cloud/vm/VmWorkJobDispatcher.java   |    7 +
 .../vm/VmWorkMockVirtualMachineManagerImpl.java    |   24 +++-
 server/test/com/cloud/vm/VmWorkTest.java           |    6 +-
 .../com/cloud/vm/VmWorkTestApiJobDispatcher.java   |   83 ++++++++++++++
 .../test/com/cloud/vm/VmWorkTestConfiguration.java |   13 ++
 .../test/com/cloud/vm/VmWorkTestJobDispatcher.java |   61 ----------
 .../com/cloud/vm/VmWorkTestWorkJobDispatcher.java  |   25 ++++
 server/test/resources/VmWorkTestContext.xml        |   11 ++-
 utils/conf/log4j-vmops.xml                         |    8 +-
 20 files changed, 326 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index 9c58459..988f4e6 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -44,6 +44,10 @@ public class AsyncJobExecutionContext  {
 		return _job.getSyncSource();
 	}
 	
+	public void resetSyncSource() {
+		_job.setSyncSource(null);
+	}
+	
 	public AsyncJob getJob() {
 		if(_job == null) {
 			_job = _jobMgr.getPseudoJob();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index e495838..f23f3d7 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -235,8 +235,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
             job.setLastUpdated(DateUtil.currentGMTTime());
             _jobDao.update(jobId, job);
-            txn.commit();
             
+        	List<Long> wakeupList = _joinMapDao.wakeupByJoinedJobCompletion(jobId);
+            _joinMapDao.disjoinAllJobs(jobId);
+            
+            txn.commit();
+
+            for(Long id : wakeupList) {
+            	// TODO, we assume that all jobs in this category is API job only
+            	AsyncJobVO jobToWakeup = _jobDao.findById(id);
+            	if(jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP)
!= 0)
+            	    scheduleExecution(jobToWakeup, false);
+            }
+             
             _messageBus.publish(null, TopicConstants.JOB_STATE, PublishScope.GLOBAL, (Long)jobId);
         } catch(Exception e) {
             s_logger.error("Unexpected exception while completing async job-" + jobId, e);
@@ -487,6 +498,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 	    			if(dispatcher.getName().equals(joinRecord.getWakeupDispatcher()))
 	    				return dispatcher;
 	    		}
+    		} else {
+    			s_logger.warn("job-" + job.getId() + " is scheduled for wakeup run, but there is no
joining info anymore");
     		}
     	}
     	return null;
@@ -689,7 +702,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                         }
                     }
               
-                    _joinMapDao.wakeupScan();
+                    List<Long> standaloneWakeupJobs = _joinMapDao.wakeupScan();
+                    for(Long jobId : standaloneWakeupJobs) {
+                    	// TODO, we assume that all jobs in this category is API job only
+                    	AsyncJobVO job = _jobDao.findById(jobId);
+                    	if(job != null && (job.getPendingSignals() & AsyncJobConstants.SIGNAL_MASK_WAKEUP)
!= 0)
+                    	    scheduleExecution(job, false);
+                    }
                 } catch(Throwable e) {
                     s_logger.error("Unexpected exception when trying to execute queue item,
", e);
                 } finally {
@@ -857,6 +876,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     @Override
     public boolean start() {
         try {
+        	_jobDao.cleanupPseduoJobs(getMsid());
+        	
             List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
             cleanupPendingJobs(l);
             _jobDao.resetJobProcess(getMsid(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(),
getSerializedErrorMessage("job cancelled because of management server restart"));

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java
index eaa97af..ac41d87 100644
--- a/server/src/com/cloud/async/SyncQueueManagerImpl.java
+++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java
@@ -233,7 +233,18 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
     }
 
     private boolean queueReadyToProcess(SyncQueueVO queueVO) {
-        return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+    	return true;
+    	
+    	//
+    	// TODO
+    	//
+    	// Need to disable concurrency disable at queue level due to the need to support
+    	// job wake-up dispatching task
+    	//
+    	// Concurrency control is better done at higher level and leave the job scheduling/serializing
simpler
+    	//
+    	
+        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java b/server/src/com/cloud/async/dao/AsyncJobDao.java
index e3f9793..f09dff5 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDao.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDao.java
@@ -27,6 +27,7 @@ public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long>
{
 	List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId);
 	
 	AsyncJobVO findPseudoJob(long threadId, long msid);
+	void cleanupPseduoJobs(long msid);
 	
 	List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
 	void resetJobProcess(long msid, int jobResultCode, String jobResultMessage);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
index eb43f9d..b8ac12d 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
@@ -40,6 +40,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long>
implements
 	private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;	
 	private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;	
 	private final SearchBuilder<AsyncJobVO> pseudoJobSearch;
+	private final SearchBuilder<AsyncJobVO> pseudoJobCleanupSearch;
 	
 	public AsyncJobDaoImpl() {
 		pendingAsyncJobSearch = createSearchBuilder();
@@ -70,6 +71,10 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long>
implements
 		pseudoJobSearch.and("instanceType", pseudoJobSearch.entity().getInstanceType(), Op.EQ);
 		pseudoJobSearch.and("instanceId", pseudoJobSearch.entity().getInstanceId(), Op.EQ);
 		pseudoJobSearch.done();
+		
+		pseudoJobCleanupSearch = createSearchBuilder();
+		pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ);
+		pseudoJobCleanupSearch.done();
 	}
 	
 	public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
@@ -116,6 +121,12 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long>
implements
 		return null;
 	}
 	
+	public void cleanupPseduoJobs(long msid) {
+		SearchCriteria<AsyncJobVO> sc = pseudoJobCleanupSearch.create();
+		sc.setParameters("initMsid", msid);
+		this.expunge(sc);
+	}
+	
 	public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
 		SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
 		sc.setParameters("created", cutTime);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
index 414b9ea..c3ee86a 100644
--- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
+++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
@@ -27,11 +27,13 @@ public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO,
Long>
 		long wakeupIntervalMs, long expirationMs,
 		Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
 	void disjoinJob(long jobId, long joinedJobId);
+	void disjoinAllJobs(long jobId);
 	
 	AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
 	List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
 	
 	void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
 	
-	void wakeupScan();
+	List<Long> wakeupScan();
+	List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
index 2e0500e..d69f343 100644
--- a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
@@ -17,7 +17,9 @@
 package com.cloud.async.dao;
 
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.TimeZone;
@@ -93,6 +95,13 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO,
Lo
 		this.expunge(sc);
 	}
 	
+	public void disjoinAllJobs(long jobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
+		sc.setParameters("jobId", jobId);
+		
+		this.expunge(sc);
+	}
+	
 	public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
 		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
 		sc.setParameters("jobId", jobId);
@@ -128,7 +137,9 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO,
Lo
         update(ub, sc, null);
 	}
 
-	public void wakeupScan() {
+	public List<Long> wakeupScan() {
+		List<Long> standaloneList = new ArrayList<Long>();
+		
 		Date cutDate = DateUtil.currentGMTTime();
 		
 		Transaction txn = Transaction.currentTxn();
@@ -139,27 +150,89 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO,
Lo
 			//
 			// performance sensitive processing, do it in plain SQL 
 			//
+			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +  
+					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration >
?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.executeUpdate();
+	        pstmt.close();
 			
-			String sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL
WHERE content_id IN " + 
-			 "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration >
? AND join_status = ?)";
+			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id
IN " + 
+					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration >
?)";
 			pstmt = txn.prepareStatement(sql);
 	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-	        pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS);
 	        pstmt.executeUpdate();
 	        pstmt.close();
+	        
+	        sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration
> ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        ResultSet rs = pstmt.executeQuery();
+	        while(rs.next()) {
+	        	standaloneList.add(rs.getLong(1));
+	        }
+	        rs.close();
+	        pstmt.close();
 			
-	        sql = "UPDATE async_job_join_map SET next_wakeup=next_wakeup + SEC_TO_TIME(wakeup_interval)
WHERE next_wakeup < ? AND expiration > ? AND join_status = ?";
+	        // update for next wake-up
+	        sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL
wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?";
 			pstmt = txn.prepareStatement(sql);
 	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-	        pstmt.setInt(3, AsyncJobConstants.STATUS_IN_PROGRESS);
 	        pstmt.executeUpdate();
 	        pstmt.close();
-
+	        
+	        txn.commit();
+		} catch (SQLException e) {
+			s_logger.error("Unexpected exception", e);
+		}
+        
+        return standaloneList;
+	}
+	
+	public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+		List<Long> standaloneList = new ArrayList<Long>();
+		
+		Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+			txn.start();
+			
+			//
+			// performance sensitive processing, do it in plain SQL 
+			//
+			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +  
+					"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setLong(1, joinedJobId);
+	        pstmt.executeUpdate();
+	        pstmt.close();
+			
+			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id
IN " + 
+					"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setLong(1, joinedJobId);
+	        pstmt.executeUpdate();
+	        pstmt.close();
+	        
+	        sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT
IN (SELECT content_id FROM sync_queue_item)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setLong(1, joinedJobId);
+	        ResultSet rs = pstmt.executeQuery();
+	        while(rs.next()) {
+	        	standaloneList.add(rs.getLong(1));
+	        }
+	        rs.close();
+	        pstmt.close();
+			
 	        txn.commit();
 		} catch (SQLException e) {
 			s_logger.error("Unexpected exception", e);
 		}
+        
+        return standaloneList;
 	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
index 7c9bf5b..287ec10 100644
--- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
@@ -77,7 +77,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
 		
 		String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
 					 " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
-                     " WHERE q.queue_size <= q.queue_size_limit AND i.queue_proc_number
IS NULL " +
+                     " WHERE i.queue_proc_number IS NULL " +
 					 " GROUP BY q.id " +
 					 " ORDER BY i.id " +
 					 " LIMIT 0, ?";

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 4674ef6..5a4f179 100755
--- a/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/server/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -118,6 +118,7 @@ import com.cloud.user.AccountManager;
 import com.cloud.user.User;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.user.dao.UserDao;
+import com.cloud.utils.DateUtil;
 import com.cloud.utils.Journal;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
@@ -1898,6 +1899,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
         @Override
         public void run() {
             s_logger.trace("VM Operation Thread Running");
+            
+            try {
+	            Date cutDate = DateUtil.currentGMTTime();
+	            cutDate = new Date(cutDate.getTime() - 60000);
+	            _workJobDao.expungeCompletedWorkJobs(cutDate);
+            } catch(Throwable e) {
+            	s_logger.error("Unexpected exception", e);
+            }
 /*            
             try {
                 _workDao.cleanup(_cleanupWait);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/vm/VmWorkJobDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VmWorkJobDao.java b/server/src/com/cloud/vm/VmWorkJobDao.java
index d48e651..0db811d 100644
--- a/server/src/com/cloud/vm/VmWorkJobDao.java
+++ b/server/src/com/cloud/vm/VmWorkJobDao.java
@@ -16,6 +16,7 @@
 // under the License.
 package com.cloud.vm;
 
+import java.util.Date;
 import java.util.List;
 
 import com.cloud.utils.db.GenericDao;
@@ -24,6 +25,8 @@ import com.cloud.vm.VmWorkJobVO.Step;
 public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
 	VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId);
 	List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId);
-	public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId,
String jobCmd);
+	List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String
jobCmd);
+	
 	void updateStep(long workJobId, Step step);
+	void expungeCompletedWorkJobs(Date cutDate);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/vm/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VmWorkJobDaoImpl.java b/server/src/com/cloud/vm/VmWorkJobDaoImpl.java
index 34be3b2..641d9d9 100644
--- a/server/src/com/cloud/vm/VmWorkJobDaoImpl.java
+++ b/server/src/com/cloud/vm/VmWorkJobDaoImpl.java
@@ -16,10 +16,12 @@
 // under the License.
 package com.cloud.vm;
 
+import java.util.Date;
 import java.util.List;
 
 import javax.annotation.PostConstruct;
 
+import com.cloud.async.AsyncJobConstants;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.GenericDaoBase;
@@ -32,6 +34,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long>
implemen
 
     protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
     protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
+    protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch;
 	
 	public VmWorkJobDaoImpl() {
 	}
@@ -50,6 +53,11 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long>
implemen
 		PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(),
Op.NEQ);
 		PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(),
Op.EQ);
 		PendingWorkJobByCommandSearch.done();
+		
+		ExpungeWorkJobSearch = createSearchBuilder();
+		ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(),
Op.LT);
+		ExpungeWorkJobSearch.and("status", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
+		ExpungeWorkJobSearch.done();
 	}
 	
 	public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
@@ -96,4 +104,12 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long>
implemen
 		jobVo.setLastUpdated(DateUtil.currentGMTTime());
 		update(workJobId, jobVo);
 	}
+	
+	public void expungeCompletedWorkJobs(Date cutDate) {
+		SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
+		sc.setParameters("lastUpdated",cutDate);
+		sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+		
+		expunge(sc);
+	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VmWorkJobDispatcher.java b/server/src/com/cloud/vm/VmWorkJobDispatcher.java
index 740b81c..8742169 100644
--- a/server/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/server/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -68,6 +68,10 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
             // down correct type back to VirtualMachineManagerImpl. It is sad that we have
to write code like this
             //
             VirtualMachineGuru<VMInstanceVO> guru = _vmMgr.getVmGuru(vm);
+            assert(guru != null);
+            if(guru == null) {
+            	s_logger.error("Unable to find virtual Guru for VM type: " + vm.getType());
+            }
             
             UserContext.registerContext(work.getUserId(), account, null, false);
             try {
@@ -76,12 +80,15 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
         			handler.invoke(guru, work);
             		_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED,
0, null);
             	} else {
+                	s_logger.error("Unable to find VM work handler. " + cmd);
+            		
             		_asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED,
0, null);
             	}
             } finally {
                 UserContext.unregisterContext();
             }
         } catch(Throwable e) {
+        	s_logger.error("Unexpected exception", e);
             _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_FAILED, 0,
null);
         }
 	}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
index 3ccc526..c078e39 100644
--- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
+++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java
@@ -24,13 +24,12 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
-import org.apache.cloudstack.framework.async.Void;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
-import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.log4j.Logger;
 
 import com.cloud.agent.api.to.NicTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
-import com.cloud.async.AsyncJob.JournalType;
+import com.cloud.async.AsyncJobConstants;
 import com.cloud.async.AsyncJobExecutionContext;
 import com.cloud.deploy.DeployDestination;
 import com.cloud.deploy.DeploymentPlan;
@@ -59,7 +58,8 @@ import com.cloud.vm.VirtualMachine.Type;
 import com.cloud.vm.VirtualMachineProfile.Param;
 
 public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManager {
-
+    private static final Logger s_logger = Logger.getLogger(VmWorkMockVirtualMachineManagerImpl.class);
+	
 	@Inject MessageBus _msgBus;
 	
 	@Override
@@ -394,11 +394,23 @@ public class VmWorkMockVirtualMachineManagerImpl implements VirtualMachineManage
     public <T extends VMInstanceVO> T processVmStartWork(T vm, Map<VirtualMachineProfile.Param,
Object> params, User caller, Account account, DeploymentPlan planToDeploy)
             throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException
{
     	
+		try {
+			Thread.sleep(120000);
+		} catch (InterruptedException e) {
+		}
+    	
     	return vm;
     }
-	
+
+	int wakeupCount = 0;
 	public void processVmStartWakeup() {
-		System.out.println("processVmStartWakeup. job-" + AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId());
+		s_logger.info("processVmStartWakeup. job-" + AsyncJobExecutionContext.getCurrentExecutionContext().getJob().getId());
+		
+		if(wakeupCount++ < 3) {
+			AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource();
+		} else {
+			AsyncJobExecutionContext.getCurrentExecutionContext().completeAsyncJob(AsyncJobConstants.STATUS_SUCCEEDED,
0, null);
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/com/cloud/vm/VmWorkTest.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTest.java b/server/test/com/cloud/vm/VmWorkTest.java
index cbc22ab..7c427b2 100644
--- a/server/test/com/cloud/vm/VmWorkTest.java
+++ b/server/test/com/cloud/vm/VmWorkTest.java
@@ -63,7 +63,8 @@ public class VmWorkTest extends TestCase {
 	
 	@Before
 	public void setup() {
-		LogUtils.initLog4j("cloud-log4j.xml");
+		LogUtils.initLog4j("log4j-vmops.xml");
+		
     	ComponentContext.initComponentsLifeCycle();
        	_vmMgr = Mockito.spy(_vmMgr);
        	Mockito.when(_clusterMgr.getManagementNodeId()).thenReturn(1L);
@@ -117,7 +118,6 @@ public class VmWorkTest extends TestCase {
 		Assert.assertTrue(work.getParams().get(VirtualMachineProfile.Param.HaTag).equals(workClone.getParams().get(VirtualMachineProfile.Param.HaTag)));
 	}
 	
-	@Test
 	public void testVmWorkDispatcher() {
 		VmWorkJobVO workJob = new VmWorkJobVO();
 		workJob.setDispatcher("VmWorkJobDispatcher");
@@ -146,7 +146,7 @@ public class VmWorkTest extends TestCase {
 	public void testVmWorkWakeup() {
 		AsyncJobVO mainJob = new AsyncJobVO();
 		
-		mainJob.setDispatcher("TestJobDispatcher");
+		mainJob.setDispatcher("TestApiJobDispatcher");
 		mainJob.setAccountId(1L);
 		mainJob.setUserId(1L);
 		mainJob.setCmd("Dummy");

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java
new file mode 100644
index 0000000..02d634c
--- /dev/null
+++ b/server/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.vm;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.inject.Inject;
+
+import com.cloud.api.ApiSerializerHelper;
+import com.cloud.async.AsyncJob;
+import com.cloud.async.AsyncJobDispatcher;
+import com.cloud.async.AsyncJobExecutionContext;
+import com.cloud.async.AsyncJobManager;
+import com.cloud.utils.component.AdapterBase;
+import com.cloud.utils.db.Transaction;
+
+public class VmWorkTestApiJobDispatcher extends AdapterBase implements AsyncJobDispatcher
{
+
+	@Inject AsyncJobManager _jobMgr;
+	
+	@Override
+	public void runJob(AsyncJob job) {
+		
+		// drop constraint check in order to do single table test
+		Statement stat = null;
+		try {
+			stat = Transaction.currentTxn().getConnection().createStatement();
+			stat.execute("SET foreign_key_checks = 0;");
+		} catch (SQLException e) {
+		} finally {
+			if(stat != null) {
+				try {
+					stat.close();
+				} catch (SQLException e) {
+				}
+			}
+		}
+		
+		VmWorkJobVO workJob = new VmWorkJobVO();
+    	
+		workJob.setDispatcher("TestWorkJobDispatcher");
+		workJob.setCmd(VmWorkConstants.VM_WORK_START);
+		
+		workJob.setAccountId(1L);
+		workJob.setUserId(1L);
+		workJob.setStep(VmWorkJobVO.Step.Starting);
+		workJob.setVmType(VirtualMachine.Type.ConsoleProxy);
+		workJob.setVmInstanceId(1L);
+
+		// save work context info (there are some duplications)
+		VmWorkStart workInfo = new VmWorkStart();
+		workInfo.setAccountId(1L);
+		workInfo.setUserId(1L);
+		workInfo.setVmId(1L);
+		workInfo.setPlan(null);
+		workInfo.setParams(null);
+		workJob.setCmdInfo(ApiSerializerHelper.toSerializedString(workInfo));
+		
+		_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, 1L);
+		
+		_jobMgr.joinJob(job.getId(), workJob.getId(), "processVmStartWakeup", 
+				VmWorkConstants.VM_WORK_JOB_WAKEUP_DISPATCHER, 
+				new String[] {}, 
+				3000, 120000);
+		AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource();
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/com/cloud/vm/VmWorkTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
index 2496dbe..67a3c00 100644
--- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java
+++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
@@ -25,6 +25,8 @@ import com.cloud.async.SyncQueueManager;
 import com.cloud.async.SyncQueueManagerImpl;
 import com.cloud.async.dao.AsyncJobDao;
 import com.cloud.async.dao.AsyncJobDaoImpl;
+import com.cloud.async.dao.AsyncJobJoinMapDao;
+import com.cloud.async.dao.AsyncJobJoinMapDaoImpl;
 import com.cloud.async.dao.AsyncJobJournalDao;
 import com.cloud.async.dao.AsyncJobJournalDaoImpl;
 import com.cloud.async.dao.SyncQueueDao;
@@ -37,6 +39,7 @@ import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.dao.EntityManager;
 import com.cloud.user.AccountManager;
 import com.cloud.user.dao.AccountDao;
+import com.cloud.vm.dao.VMInstanceDao;
 
 @Configuration
 public class VmWorkTestConfiguration {
@@ -110,4 +113,14 @@ public class VmWorkTestConfiguration {
 	public AsyncJobJournalDao jobJournalDao() {
 		return new AsyncJobJournalDaoImpl();
 	}
+	
+	@Bean
+	public AsyncJobJoinMapDao jobJoinMapDao()  {
+		return new AsyncJobJoinMapDaoImpl();
+	}
+	
+	@Bean
+	public VMInstanceDao vmInstanceDao() {
+		return Mockito.mock(VMInstanceDao.class);
+	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/com/cloud/vm/VmWorkTestJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestJobDispatcher.java
deleted file mode 100644
index 552bed5..0000000
--- a/server/test/com/cloud/vm/VmWorkTestJobDispatcher.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloud.vm;
-
-import javax.inject.Inject;
-
-import com.cloud.api.ApiSerializerHelper;
-import com.cloud.async.AsyncJob;
-import com.cloud.async.AsyncJobDispatcher;
-import com.cloud.async.AsyncJobManager;
-import com.cloud.utils.component.AdapterBase;
-
-public class VmWorkTestJobDispatcher extends AdapterBase implements AsyncJobDispatcher {
-
-	@Inject AsyncJobManager _jobMgr;
-	
-	@Override
-	public void runJob(AsyncJob job) {
-		VmWorkJobVO workJob = new VmWorkJobVO();
-    	
-		workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
-		workJob.setCmd(VmWorkConstants.VM_WORK_START);
-		
-		workJob.setAccountId(1L);
-		workJob.setUserId(1L);
-		workJob.setStep(VmWorkJobVO.Step.Starting);
-		workJob.setVmType(VirtualMachine.Type.ConsoleProxy);
-		workJob.setVmInstanceId(1L);
-
-		// save work context info (there are some duplications)
-		VmWorkStart workInfo = new VmWorkStart();
-		workInfo.setAccountId(1L);
-		workInfo.setUserId(1L);
-		workInfo.setVmId(1L);
-		workInfo.setPlan(null);
-		workInfo.setParams(null);
-		workJob.setCmdInfo(ApiSerializerHelper.toSerializedString(workInfo));
-		
-		_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, 1L);
-		
-		_jobMgr.joinJob(job.getId(), workJob.getId(), "processVmStartWakeup", 
-				VmWorkConstants.VM_WORK_JOB_WAKEUP_DISPATCHER, 
-				new String[] {}, 
-				3000, 120000);
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
new file mode 100644
index 0000000..8e9990b
--- /dev/null
+++ b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java
@@ -0,0 +1,25 @@
+package com.cloud.vm;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.async.AsyncJob;
+import com.cloud.async.AsyncJobConstants;
+import com.cloud.async.AsyncJobDispatcher;
+import com.cloud.async.AsyncJobExecutionContext;
+import com.cloud.utils.component.AdapterBase;
+
+public class VmWorkTestWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher
{
+    public static final Logger s_logger = Logger.getLogger(VmWorkTestWorkJobDispatcher.class);
+
+	@Override
+	public void runJob(AsyncJob job) {
+		s_logger.info("Begin work job execution. job-" + job.getId());
+		try {
+			Thread.sleep(120000);
+		} catch (InterruptedException e) {
+		}
+		
+		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(AsyncJobConstants.STATUS_SUCCEEDED,
null);
+		s_logger.info("End work job execution. job-" + job.getId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/server/test/resources/VmWorkTestContext.xml
----------------------------------------------------------------------
diff --git a/server/test/resources/VmWorkTestContext.xml b/server/test/resources/VmWorkTestContext.xml
index 11f50f8..0ec9796 100644
--- a/server/test/resources/VmWorkTestContext.xml
+++ b/server/test/resources/VmWorkTestContext.xml
@@ -46,8 +46,15 @@
   <bean id="VmWorkJobDispatcher" class="com.cloud.vm.VmWorkJobDispatcher">
     <property name="name" value="VmWorkJobDispatcher" />
   </bean>
-  <bean id="TestJobDispatcher" class="com.cloud.vm.VmWorkTestJobDispatcher">
-    <property name="name" value="TestJobDispatcher" />
+  <bean id="VmWorkJobWakeupDispatcher" class="com.cloud.vm.VmWorkJobWakeupDispatcher">
+    <property name="name" value="VmWorkJobWakeupDispatcher" />
+  </bean>
+  
+  <bean id="TestApiJobDispatcher" class="com.cloud.vm.VmWorkTestApiJobDispatcher">
+    <property name="name" value="TestApiJobDispatcher" />
+  </bean>
+  <bean id="TestWorkJobDispatcher" class="com.cloud.vm.VmWorkTestWorkJobDispatcher">
+    <property name="name" value="TestWorkJobDispatcher" />
   </bean>
   
   <bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase"
/>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2210c102/utils/conf/log4j-vmops.xml
----------------------------------------------------------------------
diff --git a/utils/conf/log4j-vmops.xml b/utils/conf/log4j-vmops.xml
index fafb8ee..a722e06 100644
--- a/utils/conf/log4j-vmops.xml
+++ b/utils/conf/log4j-vmops.xml
@@ -27,9 +27,9 @@ under the License.
 
    <!-- A time/date based rolling appender -->
    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-      <param name="File" value="/var/log/vmops/vmops-testcase.log"/>
+      <param name="File" value="vmops-testcase.log"/>
       <param name="Append" value="true"/>
-      <param name="Threshold" value="INFO"/>
+      <param name="Threshold" value="DEBUG"/>
 
       <!-- Rollover at midnight each day -->
       <param name="DatePattern" value="'.'yyyy-MM-dd"/>
@@ -45,7 +45,7 @@ under the License.
 
    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
       <param name="Target" value="System.out"/>
-      <param name="Threshold" value="INFO"/>
+      <param name="Threshold" value="DEBUG"/>
 
       <layout class="org.apache.log4j.PatternLayout">
          <param name="ConversionPattern" value="%d{ABSOLUTE} %5p %c{1}:%L - %m%n"/>
@@ -86,7 +86,7 @@ under the License.
    <!-- ======================= -->
 
    <root>
-      <level value="INFO"/>
+      <level value="DEBUG"/>
       <appender-ref ref="CONSOLE"/>
       <appender-ref ref="FILE"/>
    </root>


Mime
View raw message