cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [04/12] Isolated the job queue work in its own project
Date Wed, 15 May 2013 17:13:06 GMT
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
new file mode 100644
index 0000000..7c57076
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
+    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
+
+    @Inject private SyncQueueDao _syncQueueDao;
+    @Inject private SyncQueueItemDao _syncQueueItemDao;
+
+    @Override
+    @DB
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
+        Transaction txn = Transaction.currentTxn();
+        try {
+            txn.start();
+
+            _syncQueueDao.ensureQueue(syncObjType, syncObjId);
+            SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
+            if(queueVO == null)
+                throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+
+            queueVO.setQueueSizeLimit(queueSizeLimit);
+            _syncQueueDao.update(queueVO.getId(), queueVO);
+
+            Date dt = DateUtil.currentGMTTime();
+            SyncQueueItemVO item = new SyncQueueItemVO();
+            item.setQueueId(queueVO.getId());
+            item.setContentType(itemType);
+            item.setContentId(itemId);
+            item.setCreated(dt);
+
+            _syncQueueItemDao.persist(item);
+            txn.commit();
+
+            return queueVO;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txn.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
+            if(queueVO == null) {
+                s_logger.error("Sync queue(id: " + queueId + ") does not exist");
+                txt.commit();
+                return null;
+            }
+
+            if(queueReadyToProcess(queueVO)) {
+                SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
+                if(itemVO != null) {
+                    Long processNumber = queueVO.getLastProcessNumber();
+                    if(processNumber == null)
+                        processNumber = new Long(1);
+                    else
+                        processNumber = processNumber + 1;
+                    Date dt = DateUtil.currentGMTTime();
+                    queueVO.setLastProcessNumber(processNumber);
+                    queueVO.setLastUpdated(dt);
+                    queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                    itemVO.setLastProcessMsid(msid);
+                    itemVO.setLastProcessNumber(processNumber);
+                    itemVO.setLastProcessTime(dt);
+                    _syncQueueItemDao.update(itemVO.getId(), itemVO);
+
+                    txt.commit();
+                    return itemVO;
+                } else {
+                    if(s_logger.isDebugEnabled())
+                        s_logger.debug("Sync queue (" + queueId + ") is currently empty");
+                }
+            } else {
+                if(s_logger.isDebugEnabled())
+                    s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+
+        return null;
+    }
+
+    @Override
+    @DB
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
+
+        List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
+            if(l != null && l.size() > 0) {
+                for(SyncQueueItemVO item : l) {
+                    SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
+                    SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
+                    if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
+                        Long processNumber = queueVO.getLastProcessNumber();
+                        if(processNumber == null)
+                            processNumber = new Long(1);
+                        else
+                            processNumber = processNumber + 1;
+
+                        Date dt = DateUtil.currentGMTTime();
+                        queueVO.setLastProcessNumber(processNumber);
+                        queueVO.setLastUpdated(dt);
+                        queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                        _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                        itemVO.setLastProcessMsid(msid);
+                        itemVO.setLastProcessNumber(processNumber);
+                        itemVO.setLastProcessTime(dt);
+                        _syncQueueItemDao.update(item.getId(), itemVO);
+
+                        resultList.add(item);
+                    }
+                }
+            }
+            txt.commit();
+            return resultList;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public void purgeItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                _syncQueueItemDao.expunge(itemVO.getId());
+
+                // if item is active, reset queue information
+                if (itemVO.getLastProcessMsid() != null) {
+                    queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                    // decrement the count
+                    assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+                    queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+                }
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    @DB
+    public void returnItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                itemVO.setLastProcessMsid(null);
+                itemVO.setLastProcessNumber(null);
+                itemVO.setLastProcessTime(null);
+                _syncQueueItemDao.update(queueItemId, itemVO);
+
+                queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                _syncQueueDao.update(queueVO.getId(), queueVO);
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+        return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
+        return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
+    }
+
+    private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+    	return true;
+    	
+    	//
+    	// TODO
+    	//
+    	// Need to disable concurrency disable at queue level due to the need to support
+    	// job wake-up dispatching task
+    	//
+    	// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
+    	//
+    	
+        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+    }
+
+    @Override
+    public void purgeAsyncJobQueueItemId(long asyncJobId) {
+        Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+        if (itemId != null) {
+            purgeItem(itemId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
new file mode 100644
index 0000000..cad3462
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+import java.util.Date;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue")
+public class SyncQueueVO implements InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id;
+
+    @Column(name="sync_objtype")
+
+    private String syncObjType;
+
+    @Column(name="sync_objid")
+    private Long syncObjId;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+
+    @Column(name="created")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date created;
+
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+
+    @Column(name="queue_size")
+    private long queueSize = 0;
+
+    @Column(name="queue_size_limit")
+    private long queueSizeLimit = 0;
+
+    public long getId() {
+        return id;
+    }
+
+    public String getSyncObjType() {
+        return syncObjType;
+    }
+
+    public void setSyncObjType(String syncObjType) {
+        this.syncObjType = syncObjType;
+    }
+
+    public Long getSyncObjId() {
+        return syncObjId;
+    }
+
+    public void setSyncObjId(Long syncObjId) {
+        this.syncObjId = syncObjId;
+    }
+    
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+    
+    public void setLastProcessNumber(Long number) {
+        lastProcessNumber = number;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
+
+    public void setLastUpdated(Date lastUpdated) {
+        this.lastUpdated = lastUpdated;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueVO {id:").append(getId());
+        sb.append(", syncObjType: ").append(getSyncObjType());
+        sb.append(", syncObjId: ").append(getSyncObjId());
+        sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastUpdated: ").append(getLastUpdated());
+        sb.append(", created: ").append(getCreated());
+        sb.append(", count: ").append(getQueueSize());
+        sb.append("}");
+        return sb.toString();
+    }
+
+    public long getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(long queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    public long getQueueSizeLimit() {
+        return queueSizeLimit;
+    }
+
+    public void setQueueSizeLimit(long queueSizeLimit) {
+        this.queueSizeLimit = queueSizeLimit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
new file mode 100644
index 0000000..ba28495
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
+	AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
+	List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId);
+	
+	AsyncJobVO findPseudoJob(long threadId, long msid);
+	void cleanupPseduoJobs(long msid);
+	
+	List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
+	void resetJobProcess(long msid, int jobResultCode, String jobResultMessage);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
new file mode 100644
index 0000000..9559b9c
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.SearchCriteria.Op;
+
+public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
+    private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
+	
+	private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;	
+	private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;	
+	private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;	
+	private final SearchBuilder<AsyncJobVO> pseudoJobSearch;
+	private final SearchBuilder<AsyncJobVO> pseudoJobCleanupSearch;
+	
+	public AsyncJobDaoImpl() {
+		pendingAsyncJobSearch = createSearchBuilder();
+		pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), 
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), 
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), 
+				SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.done();
+		
+		pendingAsyncJobsSearch = createSearchBuilder();
+		pendingAsyncJobsSearch.and("instanceType", pendingAsyncJobsSearch.entity().getInstanceType(), 
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobsSearch.and("accountId", pendingAsyncJobsSearch.entity().getAccountId(), 
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobsSearch.and("status", pendingAsyncJobsSearch.entity().getStatus(), 
+				SearchCriteria.Op.EQ);
+		pendingAsyncJobsSearch.done();
+		
+		expiringAsyncJobSearch = createSearchBuilder();
+		expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), 
+			SearchCriteria.Op.LTEQ);
+		expiringAsyncJobSearch.done();
+		
+		pseudoJobSearch = createSearchBuilder();
+		pseudoJobSearch.and("jobDispatcher", pseudoJobSearch.entity().getDispatcher(), Op.EQ);
+		pseudoJobSearch.and("instanceType", pseudoJobSearch.entity().getInstanceType(), Op.EQ);
+		pseudoJobSearch.and("instanceId", pseudoJobSearch.entity().getInstanceId(), Op.EQ);
+		pseudoJobSearch.done();
+		
+		pseudoJobCleanupSearch = createSearchBuilder();
+		pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ);
+		pseudoJobCleanupSearch.done();
+	}
+	
+	public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
+        SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
+        sc.setParameters("instanceType", instanceType);
+        sc.setParameters("instanceId", instanceId);
+        sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+        
+        List<AsyncJobVO> l = listIncludingRemovedBy(sc);
+        if(l != null && l.size() > 0) {
+        	if(l.size() > 1) {
+        		s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job");
+        	}
+        	
+        	return l.get(0);
+        }
+        return null;
+	}
+	
+	public List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId) {
+		SearchCriteria<AsyncJobVO> sc = pendingAsyncJobsSearch.create();
+        sc.setParameters("instanceType", instanceType);
+        
+        if (accountId != null) {
+            sc.setParameters("accountId", accountId);
+        }
+        sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS);
+        
+        return listBy(sc);
+	}
+	
+	public AsyncJobVO findPseudoJob(long threadId, long msid) {
+		SearchCriteria<AsyncJobVO> sc = pseudoJobSearch.create();
+		sc.setParameters("jobDispatcher", AsyncJobConstants.JOB_DISPATCHER_PSEUDO);
+		sc.setParameters("instanceType", AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE);
+		sc.setParameters("instanceId", threadId);
+		
+		List<AsyncJobVO> result = listBy(sc);
+		if(result != null && result.size() > 0) {
+			assert(result.size() == 1);
+			return result.get(0);
+		}
+		
+		return null;
+	}
+	
+	public void cleanupPseduoJobs(long msid) {
+		SearchCriteria<AsyncJobVO> sc = pseudoJobCleanupSearch.create();
+		sc.setParameters("initMsid", msid);
+		this.expunge(sc);
+	}
+	
+	public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
+		SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
+		sc.setParameters("created", cutTime);
+		Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
+		return listIncludingRemovedBy(sc, filter);
+	}
+
+	@DB
+	public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
+		String sql = "UPDATE async_job SET job_status=" + AsyncJobConstants.STATUS_FAILED + ", job_result_code=" + jobResultCode 
+			+ ", job_result='" + jobResultMessage + "' where job_status=0 AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))";
+		
+        Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = txn.prepareAutoCloseStatement(sql);
+            pstmt.setLong(1, msid);
+            pstmt.setLong(2, msid);
+            pstmt.execute();
+        } catch (SQLException e) {
+        	s_logger.warn("Unable to reset job status for management server " + msid, e);
+        } catch (Throwable e) {
+        	s_logger.warn("Unable to reset job status for management server " + msid, e);
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
new file mode 100644
index 0000000..acf1324
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> {
+	
+	Long joinJob(long jobId, long joinJobId, long joinMsid, 
+		long wakeupIntervalMs, long expirationMs,
+		Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
+	void disjoinJob(long jobId, long joinedJobId);
+	void disjoinAllJobs(long jobId);
+	
+	AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
+	List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
+	
+	void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid);
+	
+	List<Long> wakeupScan();
+	List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
new file mode 100644
index 0000000..0b41953
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.UpdateBuilder;
+import com.cloud.utils.db.SearchCriteria.Op;
+
+public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao {
+    public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
+	
+	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;	
+	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;	
+	private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;	
+	private final SearchBuilder<AsyncJobJoinMapVO> WakeupSearch;
+	
+	public AsyncJobJoinMapDaoImpl() {
+		RecordSearch = createSearchBuilder();
+		RecordSearch.and("jobId", RecordSearch.entity().getJobId(), Op.EQ);
+		RecordSearch.and("joinJobId", RecordSearch.entity().getJoinJobId(), Op.EQ);
+		RecordSearch.done();
+
+		RecordSearchByOwner = createSearchBuilder();
+		RecordSearchByOwner.and("jobId", RecordSearchByOwner.entity().getJobId(), Op.EQ);
+		RecordSearchByOwner.done();
+		
+		CompleteJoinSearch = createSearchBuilder();
+		CompleteJoinSearch.and("joinJobId", CompleteJoinSearch.entity().getJoinJobId(), Op.EQ);
+		CompleteJoinSearch.done();
+		
+		WakeupSearch = createSearchBuilder();
+		WakeupSearch.and("nextWakeupTime", WakeupSearch.entity().getNextWakeupTime(), Op.LT);
+		WakeupSearch.and("expiration", WakeupSearch.entity().getExpiration(), Op.GT);
+		WakeupSearch.and("joinStatus", WakeupSearch.entity().getJoinStatus(), Op.EQ);
+		WakeupSearch.done();
+	}
+	
+	public Long joinJob(long jobId, long joinJobId, long joinMsid, 
+		long wakeupIntervalMs, long expirationMs,
+		Long syncSourceId, String wakeupHandler, String wakeupDispatcher) {
+		
+		AsyncJobJoinMapVO record = new AsyncJobJoinMapVO();
+		record.setJobId(jobId);
+		record.setJoinJobId(joinJobId);
+		record.setJoinMsid(joinMsid);
+		record.setJoinStatus(AsyncJobConstants.STATUS_IN_PROGRESS);
+		record.setSyncSourceId(syncSourceId);
+		record.setWakeupInterval(wakeupIntervalMs / 1000);		// convert millisecond to second
+		record.setWakeupHandler(wakeupHandler);
+		record.setWakeupDispatcher(wakeupDispatcher);
+		if(wakeupHandler != null) {
+			record.setNextWakeupTime(new Date(DateUtil.currentGMTTime().getTime() + wakeupIntervalMs));
+			record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs));
+		}
+		
+		this.persist(record);
+		return record.getId();
+	}
+	
+	public void disjoinJob(long jobId, long joinedJobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
+		sc.setParameters("jobId", jobId);
+		sc.setParameters("joinJobId", joinedJobId);
+		
+		this.expunge(sc);
+	}
+	
+	public void disjoinAllJobs(long jobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
+		sc.setParameters("jobId", jobId);
+		
+		this.expunge(sc);
+	}
+	
+	public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
+		sc.setParameters("jobId", jobId);
+		sc.setParameters("joinJobId", joinJobId);
+		
+		List<AsyncJobJoinMapVO> result = this.listBy(sc);
+		if(result != null && result.size() > 0) {
+			assert(result.size() == 1);
+			return result.get(0);
+		}
+		
+		return null;
+	}
+	
+	public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
+		sc.setParameters("jobId", jobId);
+		
+		return this.listBy(sc);
+	}
+	
+	public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) {
+        AsyncJobJoinMapVO record = createForUpdate();
+        record.setJoinStatus(joinStatus);
+        record.setJoinResult(joinResult);
+        record.setCompleteMsid(completeMsid);
+        record.setLastUpdated(DateUtil.currentGMTTime());
+        
+        UpdateBuilder ub = getUpdateBuilder(record);
+        
+        SearchCriteria<AsyncJobJoinMapVO> sc = CompleteJoinSearch.create();
+        sc.setParameters("joinJobId", joinJobId);
+        update(ub, sc, null);
+	}
+
+	public List<Long> wakeupScan() {
+		List<Long> standaloneList = new ArrayList<Long>();
+		
+		Date cutDate = DateUtil.currentGMTTime();
+		
+		Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+			txn.start();
+			
+			//
+			// performance sensitive processing, do it in plain SQL 
+			//
+			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +  
+					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.executeUpdate();
+	        pstmt.close();
+			
+			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + 
+					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.executeUpdate();
+	        pstmt.close();
+	        
+	        sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        ResultSet rs = pstmt.executeQuery();
+	        while(rs.next()) {
+	        	standaloneList.add(rs.getLong(1));
+	        }
+	        rs.close();
+	        pstmt.close();
+			
+	        // update for next wake-up
+	        sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+	        pstmt.executeUpdate();
+	        pstmt.close();
+	        
+	        txn.commit();
+		} catch (SQLException e) {
+			s_logger.error("Unexpected exception", e);
+		}
+        
+        return standaloneList;
+	}
+	
+	public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+		List<Long> standaloneList = new ArrayList<Long>();
+		
+		Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+			txn.start();
+			
+			//
+			// performance sensitive processing, do it in plain SQL 
+			//
+			String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " +  
+					"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setLong(1, joinedJobId);
+	        pstmt.executeUpdate();
+	        pstmt.close();
+			
+			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + 
+					"(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setLong(1, joinedJobId);
+	        pstmt.executeUpdate();
+	        pstmt.close();
+	        
+	        sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+			pstmt = txn.prepareStatement(sql);
+	        pstmt.setLong(1, joinedJobId);
+	        ResultSet rs = pstmt.executeQuery();
+	        while(rs.next()) {
+	        	standaloneList.add(rs.getLong(1));
+	        }
+	        rs.close();
+	        pstmt.close();
+			
+	        txn.commit();
+		} catch (SQLException e) {
+			s_logger.error("Unexpected exception", e);
+		}
+        
+        return standaloneList;
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
new file mode 100644
index 0000000..e8d8287
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobJournalDao extends GenericDao<AsyncJobJournalVO, Long> {
+	List<AsyncJobJournalVO> getJobJournal(long jobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
new file mode 100644
index 0000000..9d02307
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
+
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+
+public class AsyncJobJournalDaoImpl extends GenericDaoBase<AsyncJobJournalVO, Long> implements AsyncJobJournalDao {
+
+	private final SearchBuilder<AsyncJobJournalVO> JobJournalSearch;	
+
+	public AsyncJobJournalDaoImpl() {
+		JobJournalSearch = createSearchBuilder();
+		JobJournalSearch.and("jobId", JobJournalSearch.entity().getJobId(), Op.EQ);
+		JobJournalSearch.done();
+	}
+	
+	@Override
+	public List<AsyncJobJournalVO> getJobJournal(long jobId) {
+		SearchCriteria<AsyncJobJournalVO> sc = JobJournalSearch.create();
+		sc.setParameters("jobId", jobId);
+		
+		return this.listBy(sc);
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
new file mode 100644
index 0000000..f45245a
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import org.apache.cloudstack.framework.jobs.SyncQueueVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface SyncQueueDao extends GenericDao<SyncQueueVO, Long>{
+	public void ensureQueue(String syncObjType, long syncObjId);
+	public SyncQueueVO find(String syncObjType, long syncObjId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
new file mode 100644
index 0000000..05e2a73
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.SyncQueueVO;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+
+public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implements SyncQueueDao {
+    private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName());
+    
+    SearchBuilder<SyncQueueVO> TypeIdSearch = createSearchBuilder();
+
+    public SyncQueueDaoImpl() {
+	    super();
+	    TypeIdSearch = createSearchBuilder();
+        TypeIdSearch.and("syncObjType", TypeIdSearch.entity().getSyncObjType(), SearchCriteria.Op.EQ);
+        TypeIdSearch.and("syncObjId", TypeIdSearch.entity().getSyncObjId(), SearchCriteria.Op.EQ);
+        TypeIdSearch.done();
+	}
+	
+	@Override
+	public void ensureQueue(String syncObjType, long syncObjId) {
+		Date dt = DateUtil.currentGMTTime();
+        String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" +
+                " values(?, ?, ?, ?)";
+		
+        Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = txn.prepareAutoCloseStatement(sql);
+            pstmt.setString(1, syncObjType);
+            pstmt.setLong(2, syncObjId);
+            pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), dt));
+            pstmt.setString(4, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), dt));
+            pstmt.execute();
+        } catch (SQLException e) {
+        	s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
+        } catch (Throwable e) {
+        	s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e);
+        }
+	}
+	
+	@Override
+	public SyncQueueVO find(String syncObjType, long syncObjId) {
+    	SearchCriteria<SyncQueueVO> sc = TypeIdSearch.create();
+    	sc.setParameters("syncObjType", syncObjType);
+    	sc.setParameters("syncObjId", syncObjId);
+        return findOneBy(sc);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
new file mode 100644
index 0000000..b78ccf7
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
+	public SyncQueueItemVO getNextQueueItem(long queueId);
+	public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
+	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
+	public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+	public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
new file mode 100644
index 0000000..08b777c
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.GenericSearchBuilder;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+
+@DB
+public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
+    private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
+    final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
+    
+    public SyncQueueItemDaoImpl() {
+        super();
+        queueIdSearch = createSearchBuilder(Long.class);
+        queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);
+        queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
+        queueIdSearch.selectField(queueIdSearch.entity().getId());
+        queueIdSearch.done();
+    }
+
+	@Override
+	public SyncQueueItemVO getNextQueueItem(long queueId) {
+		
+		SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+        sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ);
+        sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(),	SearchCriteria.Op.NULL);
+        sb.done();
+        
+    	SearchCriteria<SyncQueueItemVO> sc = sb.create();
+    	sc.setParameters("queueId", queueId);
+    	
+    	Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
+        List<SyncQueueItemVO> l = listBy(sc, filter);
+        if(l != null && l.size() > 0)
+        	return l.get(0);
+    	
+		return null;
+	}
+
+	@Override
+	public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
+		List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
+		
+		String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
+					 " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
+                     " WHERE i.queue_proc_number IS NULL " +
+					 " GROUP BY q.id " +
+					 " ORDER BY i.id " +
+					 " LIMIT 0, ?";
+
+        Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = txn.prepareAutoCloseStatement(sql);
+            pstmt.setInt(1, maxItems);
+            ResultSet rs = pstmt.executeQuery();
+            while(rs.next()) {
+            	SyncQueueItemVO item = new SyncQueueItemVO();
+            	item.setId(rs.getLong(1));
+            	item.setQueueId(rs.getLong(2));
+            	item.setContentType(rs.getString(3));
+            	item.setContentId(rs.getLong(4));
+            	item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
+            	l.add(item);
+            }
+        } catch (SQLException e) {
+        	s_logger.error("Unexpected sql excetpion, ", e);
+        } catch (Throwable e) {
+        	s_logger.error("Unexpected excetpion, ", e);
+        }
+		return l;
+	}
+	
+	@Override
+	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+		SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+        sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
+    		SearchCriteria.Op.EQ);
+        sb.done();
+        
+    	SearchCriteria<SyncQueueItemVO> sc = sb.create();
+    	sc.setParameters("lastProcessMsid", msid);
+    	
+    	Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
+    	
+    	if(exclusive)
+    		return lockRows(sc, filter, true);
+        return listBy(sc, filter);
+	}
+
+    @Override
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
+        Date cutTime = DateUtil.currentGMTTime();
+        
+        SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
+        sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
+        sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
+        sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
+        sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
+        
+        sbItem.done();
+        
+        SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
+        sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
+        
+        if(exclusive)
+            return lockRows(sc, null, true);
+        return listBy(sc, null);
+    }
+
+    @Override
+    public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) {
+        SearchCriteria<Long> sc = queueIdSearch.create();
+        sc.setParameters("contentId", contentId);
+        sc.setParameters("contentType", contentType);
+        List<Long> id = customSearch(sc, null);
+
+        return id.size() == 0 ? null : id.get(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index bc97b21..3a2b743 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -103,8 +103,6 @@
   </dependencies>
   <build>
     <defaultGoal>install</defaultGoal>
-    <sourceDirectory>src</sourceDirectory>
-    <testSourceDirectory>test</testSourceDirectory>
     <resources>
       <resource>
         <directory>resources</directory>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index 0984d25..331d90f 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -21,23 +21,25 @@ import java.util.Map;
 
 import javax.inject.Inject;
 
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
 import org.apache.cloudstack.api.ApiErrorCode;
 import org.apache.cloudstack.api.BaseAsyncCmd;
 import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.ExceptionResponse;
-import org.apache.log4j.Logger;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 
-import com.cloud.async.AsyncJob;
-import com.cloud.async.AsyncJobConstants;
-import com.cloud.async.AsyncJobDispatcher;
-import com.cloud.async.AsyncJobManager;
 import com.cloud.user.Account;
 import com.cloud.user.UserContext;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.utils.component.AdapterBase;
 import com.cloud.utils.component.ComponentContext;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 
 public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispatcher {
     private static final Logger s_logger = Logger.getLogger(ApiAsyncJobDispatcher.class);
@@ -51,7 +53,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat
     }
     
 	@Override
-	public void runJob(AsyncJob job) {
+    public void runJob(AsyncJob job) {
         BaseAsyncCmd cmdObj = null;
         try {
             Class<?> cmdClass = Class.forName(job.getCmd());

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiDBUtils.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiDBUtils.java b/server/src/com/cloud/api/ApiDBUtils.java
index f9bdcf3..5a86160 100755
--- a/server/src/com/cloud/api/ApiDBUtils.java
+++ b/server/src/com/cloud/api/ApiDBUtils.java
@@ -25,10 +25,12 @@ import java.util.Set;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 
-import org.apache.cloudstack.api.ApiCommandJobType;
+import org.springframework.stereotype.Component;
+
 import org.apache.cloudstack.affinity.AffinityGroup;
 import org.apache.cloudstack.affinity.AffinityGroupResponse;
 import org.apache.cloudstack.affinity.dao.AffinityGroupDao;
+import org.apache.cloudstack.api.ApiCommandJobType;
 import org.apache.cloudstack.api.ApiConstants.HostDetails;
 import org.apache.cloudstack.api.ApiConstants.VMDetails;
 import org.apache.cloudstack.api.response.AccountResponse;
@@ -51,10 +53,12 @@ import org.apache.cloudstack.api.response.UserResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
 import org.apache.cloudstack.api.response.VolumeResponse;
 import org.apache.cloudstack.api.response.ZoneResponse;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
 import org.apache.cloudstack.lb.dao.ApplicationLoadBalancerRuleDao;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
-import org.springframework.stereotype.Component;
 
 import com.cloud.api.query.dao.AccountJoinDao;
 import com.cloud.api.query.dao.AffinityGroupJoinDao;
@@ -93,9 +97,6 @@ import com.cloud.api.query.vo.StoragePoolJoinVO;
 import com.cloud.api.query.vo.UserAccountJoinVO;
 import com.cloud.api.query.vo.UserVmJoinVO;
 import com.cloud.api.query.vo.VolumeJoinVO;
-import com.cloud.async.AsyncJob;
-import com.cloud.async.AsyncJobManager;
-import com.cloud.async.dao.AsyncJobDao;
 import com.cloud.capacity.CapacityVO;
 import com.cloud.capacity.dao.CapacityDao;
 import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity;
@@ -129,8 +130,6 @@ import com.cloud.host.HostVO;
 import com.cloud.host.dao.HostDao;
 import com.cloud.host.dao.HostDetailsDao;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.network.dao.AccountGuestVlanMapDao;
-import com.cloud.network.dao.AccountGuestVlanMapVO;
 import com.cloud.network.IpAddress;
 import com.cloud.network.Network;
 import com.cloud.network.Network.Capability;
@@ -1287,12 +1286,7 @@ public class ApiDBUtils {
         return _vpcOfferingDao.findById(offeringId);
     }
 
-
-    public static AsyncJob findAsyncJobById(long jobId){
-        return _asyncJobDao.findById(jobId);
-    }
-
-    public static String findJobInstanceUuid(AsyncJob job){
+    public static String findJobInstanceUuid(AsyncJob job) {
         if ( job == null )
             return null;
         String jobInstanceId = null;
@@ -1605,7 +1599,7 @@ public class ApiDBUtils {
         return _jobJoinDao.newAsyncJobResponse(ve);
     }
 
-    public static AsyncJobJoinVO newAsyncJobView(AsyncJob e){
+    public static AsyncJobJoinVO newAsyncJobView(AsyncJob e) {
         return _jobJoinDao.newAsyncJobView(e);
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java
index afcfda3..a1e467e 100755
--- a/server/src/com/cloud/api/ApiDispatcher.java
+++ b/server/src/com/cloud/api/ApiDispatcher.java
@@ -33,6 +33,9 @@ import java.util.regex.Matcher;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.acl.InfrastructureEntity;
 import org.apache.cloudstack.acl.SecurityChecker.AccessType;
@@ -52,11 +55,10 @@ import org.apache.cloudstack.api.Validate;
 import org.apache.cloudstack.api.command.user.event.ArchiveEventsCmd;
 import org.apache.cloudstack.api.command.user.event.DeleteEventsCmd;
 import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 
 import com.cloud.async.AsyncJobExecutionContext;
-import com.cloud.async.AsyncJobManager;
 import com.cloud.dao.EntityManager;
 import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.user.Account;
@@ -149,7 +151,7 @@ public class ApiDispatcher {
                 if (queueSizeLimit != null) {
                 	if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) {
                 		// if we are not within async-execution context, enqueue the command
-                		_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
+                        _asyncMgr.syncAsyncJobExecution((AsyncJob)asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
                 		return;
                 	}
                 } else {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiResponseHelper.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiResponseHelper.java b/server/src/com/cloud/api/ApiResponseHelper.java
index d5960ab..0934655 100755
--- a/server/src/com/cloud/api/ApiResponseHelper.java
+++ b/server/src/com/cloud/api/ApiResponseHelper.java
@@ -34,6 +34,9 @@ import java.util.TimeZone;
 
 import javax.inject.Inject;
 
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.acl.ControlledEntity.ACLType;
 import org.apache.cloudstack.affinity.AffinityGroup;
@@ -59,7 +62,6 @@ import org.apache.cloudstack.api.response.ConfigurationResponse;
 import org.apache.cloudstack.api.response.ControlledEntityResponse;
 import org.apache.cloudstack.api.response.ControlledViewEntityResponse;
 import org.apache.cloudstack.api.response.CounterResponse;
-import org.apache.cloudstack.api.response.CreateCmdResponse;
 import org.apache.cloudstack.api.response.DiskOfferingResponse;
 import org.apache.cloudstack.api.response.DomainResponse;
 import org.apache.cloudstack.api.response.DomainRouterResponse;
@@ -134,14 +136,13 @@ import org.apache.cloudstack.api.response.VpcOfferingResponse;
 import org.apache.cloudstack.api.response.VpcResponse;
 import org.apache.cloudstack.api.response.VpnUsersResponse;
 import org.apache.cloudstack.api.response.ZoneResponse;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.network.lb.ApplicationLoadBalancerRule;
 import org.apache.cloudstack.region.Region;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.usage.Usage;
 import org.apache.cloudstack.usage.UsageService;
 import org.apache.cloudstack.usage.UsageTypes;
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import com.cloud.api.query.ViewResponseHelper;
 import com.cloud.api.query.vo.AccountJoinVO;
@@ -163,8 +164,6 @@ import com.cloud.api.query.vo.StoragePoolJoinVO;
 import com.cloud.api.query.vo.UserAccountJoinVO;
 import com.cloud.api.query.vo.UserVmJoinVO;
 import com.cloud.api.query.vo.VolumeJoinVO;
-import com.cloud.api.response.ApiResponseSerializer;
-import com.cloud.async.AsyncJob;
 import com.cloud.capacity.Capacity;
 import com.cloud.capacity.CapacityVO;
 import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity;
@@ -296,8 +295,8 @@ public class ApiResponseHelper implements ResponseGenerator {
 
     public final Logger s_logger = Logger.getLogger(ApiResponseHelper.class);
     private static final DecimalFormat s_percentFormat = new DecimalFormat("##.##");
-    @Inject private EntityManager _entityMgr = null;
-    @Inject private UsageService _usageSvc = null;
+    @Inject private final EntityManager _entityMgr = null;
+    @Inject private final UsageService _usageSvc = null;
     @Inject NetworkModel _ntwkModel;
 
     @Override
@@ -434,7 +433,7 @@ public class ApiResponseHelper implements ResponseGenerator {
             DataCenter zone = ApiDBUtils.findZoneById(volume.getDataCenterId());
             if (zone != null) {
             	snapshotResponse.setZoneName(zone.getName());
-            	snapshotResponse.setZoneType(zone.getNetworkType().toString());                
+            	snapshotResponse.setZoneType(zone.getNetworkType().toString());
             }
         }
         snapshotResponse.setCreated(snapshot.getCreated());
@@ -827,7 +826,7 @@ public class ApiResponseHelper implements ResponseGenerator {
                 capacityResponse.setCapacityType(capacity.getCapacityType());
                 capacityResponse.setCapacityUsed(capacity.getUsedCapacity());
                 if (capacity.getCapacityType() == Capacity.CAPACITY_TYPE_CPU) {
-                    capacityResponse.setCapacityTotal(new Long((long) (capacity.getTotalCapacity())));
+                    capacityResponse.setCapacityTotal(new Long((capacity.getTotalCapacity())));
                 } else if (capacity.getCapacityType() == Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED) {
                     List<SummedCapacity> c = ApiDBUtils.findNonSharedStorageForClusterPodZone(null, pod.getId(), null);
                     capacityResponse.setCapacityTotal(capacity.getTotalCapacity() - c.get(0).getTotalCapacity());
@@ -1820,12 +1819,6 @@ public class ApiResponseHelper implements ResponseGenerator {
 
     }
 
-    @Override
-    public String toSerializedString(CreateCmdResponse response, String responseType) {
-        return ApiResponseSerializer.toSerializedString(response, responseType);
-    }
-
-    @Override
     public AsyncJobResponse createAsyncJobResponse(AsyncJob job) {
         AsyncJobJoinVO vJob = ApiDBUtils.newAsyncJobView(job);
         return ApiDBUtils.newAsyncJobResponse(vJob);
@@ -2375,7 +2368,7 @@ public class ApiResponseHelper implements ResponseGenerator {
         }
 
         // populate network offering information
-        NetworkOffering networkOffering = (NetworkOffering) ApiDBUtils.findNetworkOfferingById(network.getNetworkOfferingId());
+        NetworkOffering networkOffering = ApiDBUtils.findNetworkOfferingById(network.getNetworkOfferingId());
         if (networkOffering != null) {
             response.setNetworkOfferingId(networkOffering.getUuid());
             response.setNetworkOfferingName(networkOffering.getName());
@@ -3665,6 +3658,7 @@ public class ApiResponseHelper implements ResponseGenerator {
         return response;
     }
 
+    @Override
     public NicSecondaryIpResponse createSecondaryIPToNicResponse(NicSecondaryIp result) {
         NicSecondaryIpResponse response = new NicSecondaryIpResponse();
         NicVO nic = _entityMgr.findById(NicVO.class, result.getNicId());
@@ -3677,6 +3671,7 @@ public class ApiResponseHelper implements ResponseGenerator {
         return response;
     }
 
+    @Override
     public NicResponse createNicResponse(Nic result) {
         NicResponse response = new NicResponse();
         NetworkVO network = _entityMgr.findById(NetworkVO.class, result.getNetworkId());

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index 0f375b8..0cd7201 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -109,17 +109,20 @@ import org.apache.cloudstack.api.command.user.vm.ListVMsCmd;
 import org.apache.cloudstack.api.command.user.vmgroup.ListVMGroupsCmd;
 import org.apache.cloudstack.api.command.user.volume.ListVolumesCmd;
 import org.apache.cloudstack.api.command.user.zone.ListZonesByCmd;
+import org.apache.cloudstack.api.response.AsyncJobResponse;
+import org.apache.cloudstack.api.response.CreateCmdResponse;
 import org.apache.cloudstack.api.response.ExceptionResponse;
 import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.AsyncJobVO;
 import org.apache.cloudstack.region.RegionManager;
 
 import com.cloud.api.response.ApiResponseSerializer;
-import com.cloud.async.AsyncJob;
-import com.cloud.async.AsyncJobManager;
-import com.cloud.async.AsyncJobVO;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.ConfigurationVO;
 import com.cloud.configuration.dao.ConfigurationDao;
+import com.cloud.dao.EntityManager;
 import com.cloud.domain.Domain;
 import com.cloud.domain.DomainVO;
 import com.cloud.event.ActionEventUtils;
@@ -149,7 +152,7 @@ import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.Transaction;
 import com.cloud.utils.exception.CloudRuntimeException;
 
-@Component 
+@Component
 public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiServerService {
     private static final Logger s_logger = Logger.getLogger(ApiServer.class.getName());
     private static final Logger s_accessLogger = Logger.getLogger("apiserver." + ApiServer.class.getName());
@@ -166,6 +169,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
     @Inject List<PluggableService> _pluggableServices;
     @Inject List<APIChecker> _apiAccessCheckers;
 
+    @Inject
+    private EntityManager _entityMgr;
+
     @Inject private final RegionManager _regionMgr = null;
 
     private static int _workerCount = 0;
@@ -444,6 +450,24 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
         return response;
     }
 
+    private String getBaseAsyncResponse(long jobId, BaseAsyncCmd cmd) {
+        AsyncJobResponse response = new AsyncJobResponse();
+
+        AsyncJob job = _entityMgr.findById(AsyncJob.class, jobId);
+        response.setJobId(job.getUuid());
+        response.setResponseName(cmd.getCommandName());
+        return ApiResponseSerializer.toSerializedString(response, cmd.getResponseType());
+    }
+
+    private String getBaseAsyncCreateResponse(long jobId, BaseAsyncCreateCmd cmd, String objectUuid) {
+        CreateCmdResponse response = new CreateCmdResponse();
+        AsyncJob job = _entityMgr.findById(AsyncJob.class, jobId);
+        response.setJobId(job.getUuid());
+        response.setId(objectUuid);
+        response.setResponseName(cmd.getCommandName());
+        return ApiResponseSerializer.toSerializedString(response, cmd.getResponseType());
+    }
+
     private String queueCommand(BaseCmd cmdObj, Map<String, String> params) throws Exception {
         UserContext ctx = UserContext.current();
         Long callerUserId = ctx.getCallerUserId();
@@ -494,7 +518,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
 
             Long instanceId = (objectId == null) ? asyncCmd.getInstanceId() : objectId;
             AsyncJobVO job = new AsyncJobVO(callerUserId, caller.getId(), cmdObj.getClass().getName(),
-                    ApiGsonHelper.getBuilder().create().toJson(params), instanceId, 
+                    ApiGsonHelper.getBuilder().create().toJson(params), instanceId,
                     asyncCmd.getInstanceType() != null ? asyncCmd.getInstanceType().toString() : null);
 
             long jobId = _asyncMgr.submitAsyncJob(job);
@@ -507,11 +531,11 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
 
             if (objectId != null) {
                 String objUuid = (objectUuid == null) ? objectId.toString() : objectUuid;
-                return ((BaseAsyncCreateCmd) asyncCmd).getResponse(jobId, objUuid);
+                return getBaseAsyncCreateResponse(jobId, (BaseAsyncCreateCmd)asyncCmd, objUuid);
+            } else {
+                SerializationContext.current().setUuidTranslation(true);
+                return getBaseAsyncResponse(jobId, asyncCmd);
             }
-
-            SerializationContext.current().setUuidTranslation(true);
-            return ApiResponseSerializer.toSerializedString(asyncCmd.getResponse(jobId), asyncCmd.getResponseType());
         } else {
             _dispatcher.dispatch(cmdObj, params);
 
@@ -564,7 +588,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
                 if (job.getInstanceId() == null) {
                     continue;
                 }
-                String instanceUuid = ApiDBUtils.findJobInstanceUuid(job);
+                String instanceUuid = job.getUuid();
                 if (instanceUuid != null) {
                     objectJobMap.put(instanceUuid, job);
                 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java b/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java
index f7a2c8c..756425f 100644
--- a/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java
+++ b/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java
@@ -16,12 +16,10 @@
 // under the License.
 package com.cloud.api.query.dao;
 
-import java.util.List;
-
 import org.apache.cloudstack.api.response.AsyncJobResponse;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
 
 import com.cloud.api.query.vo.AsyncJobJoinVO;
-import com.cloud.async.AsyncJob;
 import com.cloud.utils.db.GenericDao;
 
 public interface AsyncJobJoinDao extends GenericDao<AsyncJobJoinVO, Long> {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java b/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java
index fb5695b..5090475 100644
--- a/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java
+++ b/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java
@@ -22,15 +22,15 @@ import java.util.List;
 import javax.ejb.Local;
 
 import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
 
-import com.cloud.api.ApiSerializerHelper;
-import com.cloud.api.SerializationContext;
-import com.cloud.api.query.vo.AsyncJobJoinVO;
-import com.cloud.async.AsyncJob;
 import org.apache.cloudstack.api.ResponseObject;
 import org.apache.cloudstack.api.response.AsyncJobResponse;
-import org.springframework.stereotype.Component;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
 
+import com.cloud.api.ApiSerializerHelper;
+import com.cloud.api.SerializationContext;
+import com.cloud.api.query.vo.AsyncJobJoinVO;
 import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
@@ -40,7 +40,7 @@ import com.cloud.utils.db.SearchCriteria;
 public class AsyncJobJoinDaoImpl extends GenericDaoBase<AsyncJobJoinVO, Long> implements AsyncJobJoinDao {
     public static final Logger s_logger = Logger.getLogger(AsyncJobJoinDaoImpl.class);
 
-    private SearchBuilder<AsyncJobJoinVO> jobIdSearch;
+    private final SearchBuilder<AsyncJobJoinVO> jobIdSearch;
 
     protected AsyncJobJoinDaoImpl() {
 
@@ -49,7 +49,7 @@ public class AsyncJobJoinDaoImpl extends GenericDaoBase<AsyncJobJoinVO, Long> im
         jobIdSearch.and("id", jobIdSearch.entity().getId(), SearchCriteria.Op.EQ);
         jobIdSearch.done();
 
-        this._count = "select count(distinct id) from async_job_view WHERE ";
+        _count = "select count(distinct id) from async_job_view WHERE ";
     }
 
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobConstants.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java
deleted file mode 100644
index 6081d0c..0000000
--- a/server/src/com/cloud/async/AsyncJobConstants.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.async;
-
-public interface AsyncJobConstants {
-	public static final int STATUS_IN_PROGRESS = 0;
-	public static final int STATUS_SUCCEEDED = 1;
-	public static final int STATUS_FAILED = 2;
-	
-	public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
-	public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
-	
-	public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
-	
-	// Although we may have detailed masks for each individual wakeup event, i.e.
-	// periodical timer, matched topic from message bus, it seems that we don't
-	// need to distinguish them to such level. Therefore, only one wakeup signal
-	// is defined
-	public static final int SIGNAL_MASK_WAKEUP = 1;
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java b/server/src/com/cloud/async/AsyncJobDispatcher.java
deleted file mode 100644
index ed03d33..0000000
--- a/server/src/com/cloud/async/AsyncJobDispatcher.java
+++ /dev/null
@@ -1,23 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.async;
-
-import com.cloud.utils.component.Adapter;
-
-public interface AsyncJobDispatcher extends Adapter {
-	void runJob(AsyncJob job);
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index 988f4e6..e6efd03 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -18,7 +18,13 @@ package com.cloud.async;
 
 import javax.inject.Inject;
 
-import com.cloud.async.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.SyncQueueItem;
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+
 import com.cloud.exception.ConcurrentOperationException;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.ResourceUnavailableException;
@@ -26,7 +32,7 @@ import com.cloud.serializer.SerializerHelper;
 import com.cloud.utils.component.ComponentContext;
 
 public class AsyncJobExecutionContext  {
-	private AsyncJob _job;
+    private AsyncJob _job;
 	
 	@Inject private AsyncJobManager _jobMgr;
 	@Inject private AsyncJobJoinMapDao _joinMapDao;
@@ -36,7 +42,7 @@ public class AsyncJobExecutionContext  {
 	public AsyncJobExecutionContext() {
 	}
 	
-	public AsyncJobExecutionContext(AsyncJob job) {
+    public AsyncJobExecutionContext(AsyncJob job) {
 		_job = job;
 	}
 	
@@ -48,7 +54,7 @@ public class AsyncJobExecutionContext  {
 		_job.setSyncSource(null);
 	}
 	
-	public AsyncJob getJob() {
+    public AsyncJob getJob() {
 		if(_job == null) {
 			_job = _jobMgr.getPseudoJob();
 		}
@@ -56,7 +62,7 @@ public class AsyncJobExecutionContext  {
 		return _job;
 	}
 	
-	public void setJob(AsyncJob job) {
+    public void setJob(AsyncJob job) {
 		_job = job;
 	}
 	
@@ -75,7 +81,7 @@ public class AsyncJobExecutionContext  {
     	_jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId);
     }
 	
-	public void logJobJournal(AsyncJob.JournalType journalType, String 
+    public void logJobJournal(AsyncJob.JournalType journalType, String
 	    journalText, String journalObjJson) {
 		assert(_job != null);
 		_jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson);
@@ -89,14 +95,14 @@ public class AsyncJobExecutionContext  {
     public void joinJob(long joinJobId, String wakeupHandler, String wakeupDispatcher,
     		String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) {
     	assert(_job != null);
-    	_jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus, 
+    	_jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus,
     		wakeupIntervalInMilliSeconds, timeoutInMilliSeconds);
     }
     
     //
 	// check failure exception before we disjoin the worker job
 	// TODO : it is ugly and this will become unnecessary after we switch to full-async mode
-	// 
+	//
     public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
 		ConcurrentOperationException, ResourceUnavailableException {
     	assert(_job != null);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobMBean.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobMBean.java b/server/src/com/cloud/async/AsyncJobMBean.java
deleted file mode 100644
index 15d6595..0000000
--- a/server/src/com/cloud/async/AsyncJobMBean.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.async;
-
-public interface AsyncJobMBean {
-	public long getAccountId();
-	public long getUserId();
-	public String getCmd();
-	public String getCmdInfo();
-	public String getStatus();
-	public int getProcessStatus();
-	public int getResultCode();
-	public String getResult();
-	public String getInstanceType();
-	public String getInstanceId();
-	public String getInitMsid();
-	public String getCreateTime();
-	public String getLastUpdateTime();
-	public String getLastPollTime();
-	public String getSyncQueueId();
-	public String getSyncQueueContentType();
-	public String getSyncQueueContentId();
-}


Mime
View raw message