Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ADCE2DF4F for ; Wed, 15 May 2013 17:13:04 +0000 (UTC) Received: (qmail 9521 invoked by uid 500); 15 May 2013 17:13:04 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 9498 invoked by uid 500); 15 May 2013 17:13:04 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 9360 invoked by uid 99); 15 May 2013 17:13:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 May 2013 17:13:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9B75DAFDC; Wed, 15 May 2013 17:13:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ahuang@apache.org To: commits@cloudstack.apache.org Date: Wed, 15 May 2013 17:13:06 -0000 Message-Id: <09afe136a58145e99b507a4f77b0545a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/12] Isolated the job queue work in its own project http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java new file mode 100644 index 0000000..7c57076 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueManagerImpl.java @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import javax.inject.Inject; +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao; +import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao; + +import com.cloud.utils.DateUtil; +import com.cloud.utils.component.ManagerBase; +import com.cloud.utils.db.DB; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.exception.CloudRuntimeException; + +public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager { + public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName()); + + @Inject private SyncQueueDao _syncQueueDao; + @Inject private SyncQueueItemDao _syncQueueItemDao; + + @Override + @DB + public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) { + Transaction txn = Transaction.currentTxn(); + try { + txn.start(); + + _syncQueueDao.ensureQueue(syncObjType, syncObjId); + SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId); + if(queueVO == null) + throw new CloudRuntimeException("Unable to queue item into DB, DB is full?"); + + queueVO.setQueueSizeLimit(queueSizeLimit); + _syncQueueDao.update(queueVO.getId(), queueVO); + + Date dt = DateUtil.currentGMTTime(); + SyncQueueItemVO item = new SyncQueueItemVO(); + item.setQueueId(queueVO.getId()); + item.setContentType(itemType); + item.setContentId(itemId); + item.setCreated(dt); + + _syncQueueItemDao.persist(item); + txn.commit(); + + return queueVO; + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txn.rollback(); + } + return null; + } + + @Override + @DB + public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) { + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true); + if(queueVO == null) { + s_logger.error("Sync queue(id: " + queueId + ") does not exist"); + txt.commit(); + return null; + } + + if(queueReadyToProcess(queueVO)) { + SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId()); + if(itemVO != null) { + Long processNumber = queueVO.getLastProcessNumber(); + if(processNumber == null) + processNumber = new Long(1); + else + processNumber = processNumber + 1; + Date dt = DateUtil.currentGMTTime(); + queueVO.setLastProcessNumber(processNumber); + queueVO.setLastUpdated(dt); + queueVO.setQueueSize(queueVO.getQueueSize() + 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + + itemVO.setLastProcessMsid(msid); + itemVO.setLastProcessNumber(processNumber); + itemVO.setLastProcessTime(dt); + _syncQueueItemDao.update(itemVO.getId(), itemVO); + + txt.commit(); + return itemVO; + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("Sync queue (" + queueId + ") is currently empty"); + } + } else { + if(s_logger.isDebugEnabled()) + s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")"); + } + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } + + return null; + } + + @Override + @DB + public List dequeueFromAny(Long msid, int maxItems) { + + List resultList = new ArrayList(); + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + List l = _syncQueueItemDao.getNextQueueItems(maxItems); + if(l != null && l.size() > 0) { + for(SyncQueueItemVO item : l) { + SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true); + SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true); + if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) { + Long processNumber = queueVO.getLastProcessNumber(); + if(processNumber == null) + processNumber = new Long(1); + else + processNumber = processNumber + 1; + + Date dt = DateUtil.currentGMTTime(); + queueVO.setLastProcessNumber(processNumber); + queueVO.setLastUpdated(dt); + queueVO.setQueueSize(queueVO.getQueueSize() + 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + + itemVO.setLastProcessMsid(msid); + itemVO.setLastProcessNumber(processNumber); + itemVO.setLastProcessTime(dt); + _syncQueueItemDao.update(item.getId(), itemVO); + + resultList.add(item); + } + } + } + txt.commit(); + return resultList; + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } + return null; + } + + @Override + @DB + public void purgeItem(long queueItemId) { + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); + if(itemVO != null) { + SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); + + _syncQueueItemDao.expunge(itemVO.getId()); + + // if item is active, reset queue information + if (itemVO.getLastProcessMsid() != null) { + queueVO.setLastUpdated(DateUtil.currentGMTTime()); + // decrement the count + assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!"; + queueVO.setQueueSize(queueVO.getQueueSize() - 1); + _syncQueueDao.update(queueVO.getId(), queueVO); + } + } + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } + } + + @Override + @DB + public void returnItem(long queueItemId) { + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + + SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); + if(itemVO != null) { + SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); + + itemVO.setLastProcessMsid(null); + itemVO.setLastProcessNumber(null); + itemVO.setLastProcessTime(null); + _syncQueueItemDao.update(queueItemId, itemVO); + + queueVO.setLastUpdated(DateUtil.currentGMTTime()); + _syncQueueDao.update(queueVO.getId(), queueVO); + } + txt.commit(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } + } + + @Override + public List getActiveQueueItems(Long msid, boolean exclusive) { + return _syncQueueItemDao.getActiveQueueItems(msid, exclusive); + } + + @Override + public List getBlockedQueueItems(long thresholdMs, boolean exclusive) { + return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive); + } + + private boolean queueReadyToProcess(SyncQueueVO queueVO) { + return true; + + // + // TODO + // + // Need to disable concurrency disable at queue level due to the need to support + // job wake-up dispatching task + // + // Concurrency control is better done at higher level and leave the job scheduling/serializing simpler + // + + // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit(); + } + + @Override + public void purgeAsyncJobQueueItemId(long asyncJobId) { + Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType); + if (itemId != null) { + purgeItem(itemId); + } + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java new file mode 100644 index 0000000..cad3462 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/SyncQueueVO.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.framework.jobs; + +import org.apache.cloudstack.api.InternalIdentity; + +import java.util.Date; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; + +@Entity +@Table(name="sync_queue") +public class SyncQueueVO implements InternalIdentity { + + @Id + @GeneratedValue(strategy=GenerationType.IDENTITY) + @Column(name="id") + private Long id; + + @Column(name="sync_objtype") + + private String syncObjType; + + @Column(name="sync_objid") + private Long syncObjId; + + @Column(name="queue_proc_number") + private Long lastProcessNumber; + + @Column(name="created") + @Temporal(TemporalType.TIMESTAMP) + private Date created; + + @Column(name="last_updated") + @Temporal(TemporalType.TIMESTAMP) + private Date lastUpdated; + + @Column(name="queue_size") + private long queueSize = 0; + + @Column(name="queue_size_limit") + private long queueSizeLimit = 0; + + public long getId() { + return id; + } + + public String getSyncObjType() { + return syncObjType; + } + + public void setSyncObjType(String syncObjType) { + this.syncObjType = syncObjType; + } + + public Long getSyncObjId() { + return syncObjId; + } + + public void setSyncObjId(Long syncObjId) { + this.syncObjId = syncObjId; + } + + public Long getLastProcessNumber() { + return lastProcessNumber; + } + + public void setLastProcessNumber(Long number) { + lastProcessNumber = number; + } + + public Date getCreated() { + return created; + } + + public void setCreated(Date created) { + this.created = created; + } + + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("SyncQueueVO {id:").append(getId()); + sb.append(", syncObjType: ").append(getSyncObjType()); + sb.append(", syncObjId: ").append(getSyncObjId()); + sb.append(", lastProcessNumber: ").append(getLastProcessNumber()); + sb.append(", lastUpdated: ").append(getLastUpdated()); + sb.append(", created: ").append(getCreated()); + sb.append(", count: ").append(getQueueSize()); + sb.append("}"); + return sb.toString(); + } + + public long getQueueSize() { + return queueSize; + } + + public void setQueueSize(long queueSize) { + this.queueSize = queueSize; + } + + public long getQueueSizeLimit() { + return queueSizeLimit; + } + + public void setQueueSizeLimit(long queueSizeLimit) { + this.queueSizeLimit = queueSizeLimit; + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java new file mode 100644 index 0000000..ba28495 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.util.Date; +import java.util.List; + +import org.apache.cloudstack.framework.jobs.AsyncJobVO; + +import com.cloud.utils.db.GenericDao; + +public interface AsyncJobDao extends GenericDao { + AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId); + List findInstancePendingAsyncJobs(String instanceType, Long accountId); + + AsyncJobVO findPseudoJob(long threadId, long msid); + void cleanupPseduoJobs(long msid); + + List getExpiredJobs(Date cutTime, int limit); + void resetJobProcess(long msid, int jobResultCode, String jobResultMessage); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java new file mode 100644 index 0000000..9559b9c --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobVO; + +import com.cloud.utils.db.DB; +import com.cloud.utils.db.Filter; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.SearchCriteria.Op; + +public class AsyncJobDaoImpl extends GenericDaoBase implements AsyncJobDao { + private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName()); + + private final SearchBuilder pendingAsyncJobSearch; + private final SearchBuilder pendingAsyncJobsSearch; + private final SearchBuilder expiringAsyncJobSearch; + private final SearchBuilder pseudoJobSearch; + private final SearchBuilder pseudoJobCleanupSearch; + + public AsyncJobDaoImpl() { + pendingAsyncJobSearch = createSearchBuilder(); + pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(), + SearchCriteria.Op.EQ); + pendingAsyncJobSearch.done(); + + pendingAsyncJobsSearch = createSearchBuilder(); + pendingAsyncJobsSearch.and("instanceType", pendingAsyncJobsSearch.entity().getInstanceType(), + SearchCriteria.Op.EQ); + pendingAsyncJobsSearch.and("accountId", pendingAsyncJobsSearch.entity().getAccountId(), + SearchCriteria.Op.EQ); + pendingAsyncJobsSearch.and("status", pendingAsyncJobsSearch.entity().getStatus(), + SearchCriteria.Op.EQ); + pendingAsyncJobsSearch.done(); + + expiringAsyncJobSearch = createSearchBuilder(); + expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), + SearchCriteria.Op.LTEQ); + expiringAsyncJobSearch.done(); + + pseudoJobSearch = createSearchBuilder(); + pseudoJobSearch.and("jobDispatcher", pseudoJobSearch.entity().getDispatcher(), Op.EQ); + pseudoJobSearch.and("instanceType", pseudoJobSearch.entity().getInstanceType(), Op.EQ); + pseudoJobSearch.and("instanceId", pseudoJobSearch.entity().getInstanceId(), Op.EQ); + pseudoJobSearch.done(); + + pseudoJobCleanupSearch = createSearchBuilder(); + pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ); + pseudoJobCleanupSearch.done(); + } + + public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) { + SearchCriteria sc = pendingAsyncJobSearch.create(); + sc.setParameters("instanceType", instanceType); + sc.setParameters("instanceId", instanceId); + sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS); + + List l = listIncludingRemovedBy(sc); + if(l != null && l.size() > 0) { + if(l.size() > 1) { + s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job"); + } + + return l.get(0); + } + return null; + } + + public List findInstancePendingAsyncJobs(String instanceType, Long accountId) { + SearchCriteria sc = pendingAsyncJobsSearch.create(); + sc.setParameters("instanceType", instanceType); + + if (accountId != null) { + sc.setParameters("accountId", accountId); + } + sc.setParameters("status", AsyncJobConstants.STATUS_IN_PROGRESS); + + return listBy(sc); + } + + public AsyncJobVO findPseudoJob(long threadId, long msid) { + SearchCriteria sc = pseudoJobSearch.create(); + sc.setParameters("jobDispatcher", AsyncJobConstants.JOB_DISPATCHER_PSEUDO); + sc.setParameters("instanceType", AsyncJobConstants.PSEUDO_JOB_INSTANCE_TYPE); + sc.setParameters("instanceId", threadId); + + List result = listBy(sc); + if(result != null && result.size() > 0) { + assert(result.size() == 1); + return result.get(0); + } + + return null; + } + + public void cleanupPseduoJobs(long msid) { + SearchCriteria sc = pseudoJobCleanupSearch.create(); + sc.setParameters("initMsid", msid); + this.expunge(sc); + } + + public List getExpiredJobs(Date cutTime, int limit) { + SearchCriteria sc = expiringAsyncJobSearch.create(); + sc.setParameters("created", cutTime); + Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit); + return listIncludingRemovedBy(sc, filter); + } + + @DB + public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) { + String sql = "UPDATE async_job SET job_status=" + AsyncJobConstants.STATUS_FAILED + ", job_result_code=" + jobResultCode + + ", job_result='" + jobResultMessage + "' where job_status=0 AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))"; + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + pstmt.setLong(1, msid); + pstmt.setLong(2, msid); + pstmt.execute(); + } catch (SQLException e) { + s_logger.warn("Unable to reset job status for management server " + msid, e); + } catch (Throwable e) { + s_logger.warn("Unable to reset job status for management server " + msid, e); + } + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java new file mode 100644 index 0000000..acf1324 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.util.List; + +import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO; + +import com.cloud.utils.db.GenericDao; + +public interface AsyncJobJoinMapDao extends GenericDao { + + Long joinJob(long jobId, long joinJobId, long joinMsid, + long wakeupIntervalMs, long expirationMs, + Long syncSourceId, String wakeupHandler, String wakeupDispatcher); + void disjoinJob(long jobId, long joinedJobId); + void disjoinAllJobs(long jobId); + + AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId); + List listJoinRecords(long jobId); + + void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid); + + List wakeupScan(); + List wakeupByJoinedJobCompletion(long joinedJobId); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java new file mode 100644 index 0000000..0b41953 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO; + +import com.cloud.utils.DateUtil; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.UpdateBuilder; +import com.cloud.utils.db.SearchCriteria.Op; + +public class AsyncJobJoinMapDaoImpl extends GenericDaoBase implements AsyncJobJoinMapDao { + public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class); + + private final SearchBuilder RecordSearch; + private final SearchBuilder RecordSearchByOwner; + private final SearchBuilder CompleteJoinSearch; + private final SearchBuilder WakeupSearch; + + public AsyncJobJoinMapDaoImpl() { + RecordSearch = createSearchBuilder(); + RecordSearch.and("jobId", RecordSearch.entity().getJobId(), Op.EQ); + RecordSearch.and("joinJobId", RecordSearch.entity().getJoinJobId(), Op.EQ); + RecordSearch.done(); + + RecordSearchByOwner = createSearchBuilder(); + RecordSearchByOwner.and("jobId", RecordSearchByOwner.entity().getJobId(), Op.EQ); + RecordSearchByOwner.done(); + + CompleteJoinSearch = createSearchBuilder(); + CompleteJoinSearch.and("joinJobId", CompleteJoinSearch.entity().getJoinJobId(), Op.EQ); + CompleteJoinSearch.done(); + + WakeupSearch = createSearchBuilder(); + WakeupSearch.and("nextWakeupTime", WakeupSearch.entity().getNextWakeupTime(), Op.LT); + WakeupSearch.and("expiration", WakeupSearch.entity().getExpiration(), Op.GT); + WakeupSearch.and("joinStatus", WakeupSearch.entity().getJoinStatus(), Op.EQ); + WakeupSearch.done(); + } + + public Long joinJob(long jobId, long joinJobId, long joinMsid, + long wakeupIntervalMs, long expirationMs, + Long syncSourceId, String wakeupHandler, String wakeupDispatcher) { + + AsyncJobJoinMapVO record = new AsyncJobJoinMapVO(); + record.setJobId(jobId); + record.setJoinJobId(joinJobId); + record.setJoinMsid(joinMsid); + record.setJoinStatus(AsyncJobConstants.STATUS_IN_PROGRESS); + record.setSyncSourceId(syncSourceId); + record.setWakeupInterval(wakeupIntervalMs / 1000); // convert millisecond to second + record.setWakeupHandler(wakeupHandler); + record.setWakeupDispatcher(wakeupDispatcher); + if(wakeupHandler != null) { + record.setNextWakeupTime(new Date(DateUtil.currentGMTTime().getTime() + wakeupIntervalMs)); + record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs)); + } + + this.persist(record); + return record.getId(); + } + + public void disjoinJob(long jobId, long joinedJobId) { + SearchCriteria sc = RecordSearch.create(); + sc.setParameters("jobId", jobId); + sc.setParameters("joinJobId", joinedJobId); + + this.expunge(sc); + } + + public void disjoinAllJobs(long jobId) { + SearchCriteria sc = RecordSearchByOwner.create(); + sc.setParameters("jobId", jobId); + + this.expunge(sc); + } + + public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) { + SearchCriteria sc = RecordSearch.create(); + sc.setParameters("jobId", jobId); + sc.setParameters("joinJobId", joinJobId); + + List result = this.listBy(sc); + if(result != null && result.size() > 0) { + assert(result.size() == 1); + return result.get(0); + } + + return null; + } + + public List listJoinRecords(long jobId) { + SearchCriteria sc = RecordSearchByOwner.create(); + sc.setParameters("jobId", jobId); + + return this.listBy(sc); + } + + public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) { + AsyncJobJoinMapVO record = createForUpdate(); + record.setJoinStatus(joinStatus); + record.setJoinResult(joinResult); + record.setCompleteMsid(completeMsid); + record.setLastUpdated(DateUtil.currentGMTTime()); + + UpdateBuilder ub = getUpdateBuilder(record); + + SearchCriteria sc = CompleteJoinSearch.create(); + sc.setParameters("joinJobId", joinJobId); + update(ub, sc, null); + } + + public List wakeupScan() { + List standaloneList = new ArrayList(); + + Date cutDate = DateUtil.currentGMTTime(); + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + txn.start(); + + // + // performance sensitive processing, do it in plain SQL + // + String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " + + "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + + "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + standaloneList.add(rs.getLong(1)); + } + rs.close(); + pstmt.close(); + + // update for next wake-up + sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + pstmt.close(); + + txn.commit(); + } catch (SQLException e) { + s_logger.error("Unexpected exception", e); + } + + return standaloneList; + } + + public List wakeupByJoinedJobCompletion(long joinedJobId) { + List standaloneList = new ArrayList(); + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + txn.start(); + + // + // performance sensitive processing, do it in plain SQL + // + String sql = "UPDATE async_job SET job_pending_signals=1 WHERE id IN " + + "(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setLong(1, joinedJobId); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + + "(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)"; + pstmt = txn.prepareStatement(sql); + pstmt.setLong(1, joinedJobId); + pstmt.executeUpdate(); + pstmt.close(); + + sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; + pstmt = txn.prepareStatement(sql); + pstmt.setLong(1, joinedJobId); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + standaloneList.add(rs.getLong(1)); + } + rs.close(); + pstmt.close(); + + txn.commit(); + } catch (SQLException e) { + s_logger.error("Unexpected exception", e); + } + + return standaloneList; + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java new file mode 100644 index 0000000..e8d8287 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.util.List; + +import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO; + +import com.cloud.utils.db.GenericDao; + +public interface AsyncJobJournalDao extends GenericDao { + List getJobJournal(long jobId); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java new file mode 100644 index 0000000..9d02307 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.util.List; + +import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO; + +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Op; + +public class AsyncJobJournalDaoImpl extends GenericDaoBase implements AsyncJobJournalDao { + + private final SearchBuilder JobJournalSearch; + + public AsyncJobJournalDaoImpl() { + JobJournalSearch = createSearchBuilder(); + JobJournalSearch.and("jobId", JobJournalSearch.entity().getJobId(), Op.EQ); + JobJournalSearch.done(); + } + + @Override + public List getJobJournal(long jobId) { + SearchCriteria sc = JobJournalSearch.create(); + sc.setParameters("jobId", jobId); + + return this.listBy(sc); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java new file mode 100644 index 0000000..f45245a --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDao.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import org.apache.cloudstack.framework.jobs.SyncQueueVO; + +import com.cloud.utils.db.GenericDao; + +public interface SyncQueueDao extends GenericDao{ + public void ensureQueue(String syncObjType, long syncObjId); + public SyncQueueVO find(String syncObjType, long syncObjId); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java new file mode 100644 index 0000000..05e2a73 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.framework.jobs.dao; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Date; +import java.util.TimeZone; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.jobs.SyncQueueVO; + +import com.cloud.utils.DateUtil; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; + +public class SyncQueueDaoImpl extends GenericDaoBase implements SyncQueueDao { + private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName()); + + SearchBuilder TypeIdSearch = createSearchBuilder(); + + public SyncQueueDaoImpl() { + super(); + TypeIdSearch = createSearchBuilder(); + TypeIdSearch.and("syncObjType", TypeIdSearch.entity().getSyncObjType(), SearchCriteria.Op.EQ); + TypeIdSearch.and("syncObjId", TypeIdSearch.entity().getSyncObjId(), SearchCriteria.Op.EQ); + TypeIdSearch.done(); + } + + @Override + public void ensureQueue(String syncObjType, long syncObjId) { + Date dt = DateUtil.currentGMTTime(); + String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" + + " values(?, ?, ?, ?)"; + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + pstmt.setString(1, syncObjType); + pstmt.setLong(2, syncObjId); + pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), dt)); + pstmt.setString(4, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), dt)); + pstmt.execute(); + } catch (SQLException e) { + s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e); + } catch (Throwable e) { + s_logger.warn("Unable to create sync queue " + syncObjType + "-" + syncObjId + ":" + e.getMessage(), e); + } + } + + @Override + public SyncQueueVO find(String syncObjType, long syncObjId) { + SearchCriteria sc = TypeIdSearch.create(); + sc.setParameters("syncObjType", syncObjType); + sc.setParameters("syncObjId", syncObjId); + return findOneBy(sc); + } + +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java new file mode 100644 index 0000000..b78ccf7 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.dao; + +import java.util.List; + +import org.apache.cloudstack.framework.jobs.SyncQueueItemVO; + +import com.cloud.utils.db.GenericDao; + +public interface SyncQueueItemDao extends GenericDao { + public SyncQueueItemVO getNextQueueItem(long queueId); + public List getNextQueueItems(int maxItems); + public List getActiveQueueItems(Long msid, boolean exclusive); + public List getBlockedQueueItems(long thresholdMs, boolean exclusive); + public Long getQueueItemIdByContentIdAndType(long contentId, String contentType); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java new file mode 100644 index 0000000..08b777c --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.cloudstack.framework.jobs.dao; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.jobs.SyncQueueItemVO; + +import com.cloud.utils.DateUtil; +import com.cloud.utils.db.DB; +import com.cloud.utils.db.Filter; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.GenericSearchBuilder; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.Transaction; + +@DB +public class SyncQueueItemDaoImpl extends GenericDaoBase implements SyncQueueItemDao { + private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); + final GenericSearchBuilder queueIdSearch; + + public SyncQueueItemDaoImpl() { + super(); + queueIdSearch = createSearchBuilder(Long.class); + queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ); + queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ); + queueIdSearch.selectField(queueIdSearch.entity().getId()); + queueIdSearch.done(); + } + + @Override + public SyncQueueItemVO getNextQueueItem(long queueId) { + + SearchBuilder sb = createSearchBuilder(); + sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ); + sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL); + sb.done(); + + SearchCriteria sc = sb.create(); + sc.setParameters("queueId", queueId); + + Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L); + List l = listBy(sc, filter); + if(l != null && l.size() > 0) + return l.get(0); + + return null; + } + + @Override + public List getNextQueueItems(int maxItems) { + List l = new ArrayList(); + + String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + + " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " + + " WHERE i.queue_proc_number IS NULL " + + " GROUP BY q.id " + + " ORDER BY i.id " + + " LIMIT 0, ?"; + + Transaction txn = Transaction.currentTxn(); + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement(sql); + pstmt.setInt(1, maxItems); + ResultSet rs = pstmt.executeQuery(); + while(rs.next()) { + SyncQueueItemVO item = new SyncQueueItemVO(); + item.setId(rs.getLong(1)); + item.setQueueId(rs.getLong(2)); + item.setContentType(rs.getString(3)); + item.setContentId(rs.getLong(4)); + item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5))); + l.add(item); + } + } catch (SQLException e) { + s_logger.error("Unexpected sql excetpion, ", e); + } catch (Throwable e) { + s_logger.error("Unexpected excetpion, ", e); + } + return l; + } + + @Override + public List getActiveQueueItems(Long msid, boolean exclusive) { + SearchBuilder sb = createSearchBuilder(); + sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(), + SearchCriteria.Op.EQ); + sb.done(); + + SearchCriteria sc = sb.create(); + sc.setParameters("lastProcessMsid", msid); + + Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null); + + if(exclusive) + return lockRows(sc, filter, true); + return listBy(sc, filter); + } + + @Override + public List getBlockedQueueItems(long thresholdMs, boolean exclusive) { + Date cutTime = DateUtil.currentGMTTime(); + + SearchBuilder sbItem = createSearchBuilder(); + sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT); + + sbItem.done(); + + SearchCriteria sc = sbItem.create(); + sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs)); + + if(exclusive) + return lockRows(sc, null, true); + return listBy(sc, null); + } + + @Override + public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) { + SearchCriteria sc = queueIdSearch.create(); + sc.setParameters("contentId", contentId); + sc.setParameters("contentType", contentType); + List id = customSearch(sc, null); + + return id.size() == 0 ? null : id.get(0); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index bc97b21..3a2b743 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -103,8 +103,6 @@ install - src - test resources http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiAsyncJobDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java index 0984d25..331d90f 100644 --- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java +++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java @@ -21,23 +21,25 @@ import java.util.Map; import javax.inject.Inject; +import org.apache.log4j.Logger; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + import org.apache.cloudstack.api.ApiErrorCode; import org.apache.cloudstack.api.BaseAsyncCmd; import org.apache.cloudstack.api.ServerApiException; import org.apache.cloudstack.api.response.ExceptionResponse; -import org.apache.log4j.Logger; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; -import com.cloud.async.AsyncJob; -import com.cloud.async.AsyncJobConstants; -import com.cloud.async.AsyncJobDispatcher; -import com.cloud.async.AsyncJobManager; import com.cloud.user.Account; import com.cloud.user.UserContext; import com.cloud.user.dao.AccountDao; import com.cloud.utils.component.AdapterBase; import com.cloud.utils.component.ComponentContext; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispatcher { private static final Logger s_logger = Logger.getLogger(ApiAsyncJobDispatcher.class); @@ -51,7 +53,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat } @Override - public void runJob(AsyncJob job) { + public void runJob(AsyncJob job) { BaseAsyncCmd cmdObj = null; try { Class cmdClass = Class.forName(job.getCmd()); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiDBUtils.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiDBUtils.java b/server/src/com/cloud/api/ApiDBUtils.java index f9bdcf3..5a86160 100755 --- a/server/src/com/cloud/api/ApiDBUtils.java +++ b/server/src/com/cloud/api/ApiDBUtils.java @@ -25,10 +25,12 @@ import java.util.Set; import javax.annotation.PostConstruct; import javax.inject.Inject; -import org.apache.cloudstack.api.ApiCommandJobType; +import org.springframework.stereotype.Component; + import org.apache.cloudstack.affinity.AffinityGroup; import org.apache.cloudstack.affinity.AffinityGroupResponse; import org.apache.cloudstack.affinity.dao.AffinityGroupDao; +import org.apache.cloudstack.api.ApiCommandJobType; import org.apache.cloudstack.api.ApiConstants.HostDetails; import org.apache.cloudstack.api.ApiConstants.VMDetails; import org.apache.cloudstack.api.response.AccountResponse; @@ -51,10 +53,12 @@ import org.apache.cloudstack.api.response.UserResponse; import org.apache.cloudstack.api.response.UserVmResponse; import org.apache.cloudstack.api.response.VolumeResponse; import org.apache.cloudstack.api.response.ZoneResponse; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao; import org.apache.cloudstack.lb.dao.ApplicationLoadBalancerRuleDao; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; -import org.springframework.stereotype.Component; import com.cloud.api.query.dao.AccountJoinDao; import com.cloud.api.query.dao.AffinityGroupJoinDao; @@ -93,9 +97,6 @@ import com.cloud.api.query.vo.StoragePoolJoinVO; import com.cloud.api.query.vo.UserAccountJoinVO; import com.cloud.api.query.vo.UserVmJoinVO; import com.cloud.api.query.vo.VolumeJoinVO; -import com.cloud.async.AsyncJob; -import com.cloud.async.AsyncJobManager; -import com.cloud.async.dao.AsyncJobDao; import com.cloud.capacity.CapacityVO; import com.cloud.capacity.dao.CapacityDao; import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity; @@ -129,8 +130,6 @@ import com.cloud.host.HostVO; import com.cloud.host.dao.HostDao; import com.cloud.host.dao.HostDetailsDao; import com.cloud.hypervisor.Hypervisor.HypervisorType; -import com.cloud.network.dao.AccountGuestVlanMapDao; -import com.cloud.network.dao.AccountGuestVlanMapVO; import com.cloud.network.IpAddress; import com.cloud.network.Network; import com.cloud.network.Network.Capability; @@ -1287,12 +1286,7 @@ public class ApiDBUtils { return _vpcOfferingDao.findById(offeringId); } - - public static AsyncJob findAsyncJobById(long jobId){ - return _asyncJobDao.findById(jobId); - } - - public static String findJobInstanceUuid(AsyncJob job){ + public static String findJobInstanceUuid(AsyncJob job) { if ( job == null ) return null; String jobInstanceId = null; @@ -1605,7 +1599,7 @@ public class ApiDBUtils { return _jobJoinDao.newAsyncJobResponse(ve); } - public static AsyncJobJoinVO newAsyncJobView(AsyncJob e){ + public static AsyncJobJoinVO newAsyncJobView(AsyncJob e) { return _jobJoinDao.newAsyncJobView(e); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index afcfda3..a1e467e 100755 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -33,6 +33,9 @@ import java.util.regex.Matcher; import javax.annotation.PostConstruct; import javax.inject.Inject; +import org.apache.log4j.Logger; +import org.springframework.stereotype.Component; + import org.apache.cloudstack.acl.ControlledEntity; import org.apache.cloudstack.acl.InfrastructureEntity; import org.apache.cloudstack.acl.SecurityChecker.AccessType; @@ -52,11 +55,10 @@ import org.apache.cloudstack.api.Validate; import org.apache.cloudstack.api.command.user.event.ArchiveEventsCmd; import org.apache.cloudstack.api.command.user.event.DeleteEventsCmd; import org.apache.cloudstack.api.command.user.event.ListEventsCmd; -import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; import com.cloud.async.AsyncJobExecutionContext; -import com.cloud.async.AsyncJobManager; import com.cloud.dao.EntityManager; import com.cloud.exception.InvalidParameterValueException; import com.cloud.user.Account; @@ -149,7 +151,7 @@ public class ApiDispatcher { if (queueSizeLimit != null) { if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) { // if we are not within async-execution context, enqueue the command - _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit); + _asyncMgr.syncAsyncJobExecution((AsyncJob)asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit); return; } } else { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiResponseHelper.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiResponseHelper.java b/server/src/com/cloud/api/ApiResponseHelper.java index d5960ab..0934655 100755 --- a/server/src/com/cloud/api/ApiResponseHelper.java +++ b/server/src/com/cloud/api/ApiResponseHelper.java @@ -34,6 +34,9 @@ import java.util.TimeZone; import javax.inject.Inject; +import org.apache.log4j.Logger; +import org.springframework.stereotype.Component; + import org.apache.cloudstack.acl.ControlledEntity; import org.apache.cloudstack.acl.ControlledEntity.ACLType; import org.apache.cloudstack.affinity.AffinityGroup; @@ -59,7 +62,6 @@ import org.apache.cloudstack.api.response.ConfigurationResponse; import org.apache.cloudstack.api.response.ControlledEntityResponse; import org.apache.cloudstack.api.response.ControlledViewEntityResponse; import org.apache.cloudstack.api.response.CounterResponse; -import org.apache.cloudstack.api.response.CreateCmdResponse; import org.apache.cloudstack.api.response.DiskOfferingResponse; import org.apache.cloudstack.api.response.DomainResponse; import org.apache.cloudstack.api.response.DomainRouterResponse; @@ -134,14 +136,13 @@ import org.apache.cloudstack.api.response.VpcOfferingResponse; import org.apache.cloudstack.api.response.VpcResponse; import org.apache.cloudstack.api.response.VpnUsersResponse; import org.apache.cloudstack.api.response.ZoneResponse; +import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.network.lb.ApplicationLoadBalancerRule; import org.apache.cloudstack.region.Region; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.usage.Usage; import org.apache.cloudstack.usage.UsageService; import org.apache.cloudstack.usage.UsageTypes; -import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; import com.cloud.api.query.ViewResponseHelper; import com.cloud.api.query.vo.AccountJoinVO; @@ -163,8 +164,6 @@ import com.cloud.api.query.vo.StoragePoolJoinVO; import com.cloud.api.query.vo.UserAccountJoinVO; import com.cloud.api.query.vo.UserVmJoinVO; import com.cloud.api.query.vo.VolumeJoinVO; -import com.cloud.api.response.ApiResponseSerializer; -import com.cloud.async.AsyncJob; import com.cloud.capacity.Capacity; import com.cloud.capacity.CapacityVO; import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity; @@ -296,8 +295,8 @@ public class ApiResponseHelper implements ResponseGenerator { public final Logger s_logger = Logger.getLogger(ApiResponseHelper.class); private static final DecimalFormat s_percentFormat = new DecimalFormat("##.##"); - @Inject private EntityManager _entityMgr = null; - @Inject private UsageService _usageSvc = null; + @Inject private final EntityManager _entityMgr = null; + @Inject private final UsageService _usageSvc = null; @Inject NetworkModel _ntwkModel; @Override @@ -434,7 +433,7 @@ public class ApiResponseHelper implements ResponseGenerator { DataCenter zone = ApiDBUtils.findZoneById(volume.getDataCenterId()); if (zone != null) { snapshotResponse.setZoneName(zone.getName()); - snapshotResponse.setZoneType(zone.getNetworkType().toString()); + snapshotResponse.setZoneType(zone.getNetworkType().toString()); } } snapshotResponse.setCreated(snapshot.getCreated()); @@ -827,7 +826,7 @@ public class ApiResponseHelper implements ResponseGenerator { capacityResponse.setCapacityType(capacity.getCapacityType()); capacityResponse.setCapacityUsed(capacity.getUsedCapacity()); if (capacity.getCapacityType() == Capacity.CAPACITY_TYPE_CPU) { - capacityResponse.setCapacityTotal(new Long((long) (capacity.getTotalCapacity()))); + capacityResponse.setCapacityTotal(new Long((capacity.getTotalCapacity()))); } else if (capacity.getCapacityType() == Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED) { List c = ApiDBUtils.findNonSharedStorageForClusterPodZone(null, pod.getId(), null); capacityResponse.setCapacityTotal(capacity.getTotalCapacity() - c.get(0).getTotalCapacity()); @@ -1820,12 +1819,6 @@ public class ApiResponseHelper implements ResponseGenerator { } - @Override - public String toSerializedString(CreateCmdResponse response, String responseType) { - return ApiResponseSerializer.toSerializedString(response, responseType); - } - - @Override public AsyncJobResponse createAsyncJobResponse(AsyncJob job) { AsyncJobJoinVO vJob = ApiDBUtils.newAsyncJobView(job); return ApiDBUtils.newAsyncJobResponse(vJob); @@ -2375,7 +2368,7 @@ public class ApiResponseHelper implements ResponseGenerator { } // populate network offering information - NetworkOffering networkOffering = (NetworkOffering) ApiDBUtils.findNetworkOfferingById(network.getNetworkOfferingId()); + NetworkOffering networkOffering = ApiDBUtils.findNetworkOfferingById(network.getNetworkOfferingId()); if (networkOffering != null) { response.setNetworkOfferingId(networkOffering.getUuid()); response.setNetworkOfferingName(networkOffering.getName()); @@ -3665,6 +3658,7 @@ public class ApiResponseHelper implements ResponseGenerator { return response; } + @Override public NicSecondaryIpResponse createSecondaryIPToNicResponse(NicSecondaryIp result) { NicSecondaryIpResponse response = new NicSecondaryIpResponse(); NicVO nic = _entityMgr.findById(NicVO.class, result.getNicId()); @@ -3677,6 +3671,7 @@ public class ApiResponseHelper implements ResponseGenerator { return response; } + @Override public NicResponse createNicResponse(Nic result) { NicResponse response = new NicResponse(); NetworkVO network = _entityMgr.findById(NetworkVO.class, result.getNetworkId()); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/ApiServer.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index 0f375b8..0cd7201 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -109,17 +109,20 @@ import org.apache.cloudstack.api.command.user.vm.ListVMsCmd; import org.apache.cloudstack.api.command.user.vmgroup.ListVMGroupsCmd; import org.apache.cloudstack.api.command.user.volume.ListVolumesCmd; import org.apache.cloudstack.api.command.user.zone.ListZonesByCmd; +import org.apache.cloudstack.api.response.AsyncJobResponse; +import org.apache.cloudstack.api.response.CreateCmdResponse; import org.apache.cloudstack.api.response.ExceptionResponse; import org.apache.cloudstack.api.response.ListResponse; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.AsyncJobVO; import org.apache.cloudstack.region.RegionManager; import com.cloud.api.response.ApiResponseSerializer; -import com.cloud.async.AsyncJob; -import com.cloud.async.AsyncJobManager; -import com.cloud.async.AsyncJobVO; import com.cloud.configuration.Config; import com.cloud.configuration.ConfigurationVO; import com.cloud.configuration.dao.ConfigurationDao; +import com.cloud.dao.EntityManager; import com.cloud.domain.Domain; import com.cloud.domain.DomainVO; import com.cloud.event.ActionEventUtils; @@ -149,7 +152,7 @@ import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; -@Component +@Component public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiServerService { private static final Logger s_logger = Logger.getLogger(ApiServer.class.getName()); private static final Logger s_accessLogger = Logger.getLogger("apiserver." + ApiServer.class.getName()); @@ -166,6 +169,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer @Inject List _pluggableServices; @Inject List _apiAccessCheckers; + @Inject + private EntityManager _entityMgr; + @Inject private final RegionManager _regionMgr = null; private static int _workerCount = 0; @@ -444,6 +450,24 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer return response; } + private String getBaseAsyncResponse(long jobId, BaseAsyncCmd cmd) { + AsyncJobResponse response = new AsyncJobResponse(); + + AsyncJob job = _entityMgr.findById(AsyncJob.class, jobId); + response.setJobId(job.getUuid()); + response.setResponseName(cmd.getCommandName()); + return ApiResponseSerializer.toSerializedString(response, cmd.getResponseType()); + } + + private String getBaseAsyncCreateResponse(long jobId, BaseAsyncCreateCmd cmd, String objectUuid) { + CreateCmdResponse response = new CreateCmdResponse(); + AsyncJob job = _entityMgr.findById(AsyncJob.class, jobId); + response.setJobId(job.getUuid()); + response.setId(objectUuid); + response.setResponseName(cmd.getCommandName()); + return ApiResponseSerializer.toSerializedString(response, cmd.getResponseType()); + } + private String queueCommand(BaseCmd cmdObj, Map params) throws Exception { UserContext ctx = UserContext.current(); Long callerUserId = ctx.getCallerUserId(); @@ -494,7 +518,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer Long instanceId = (objectId == null) ? asyncCmd.getInstanceId() : objectId; AsyncJobVO job = new AsyncJobVO(callerUserId, caller.getId(), cmdObj.getClass().getName(), - ApiGsonHelper.getBuilder().create().toJson(params), instanceId, + ApiGsonHelper.getBuilder().create().toJson(params), instanceId, asyncCmd.getInstanceType() != null ? asyncCmd.getInstanceType().toString() : null); long jobId = _asyncMgr.submitAsyncJob(job); @@ -507,11 +531,11 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer if (objectId != null) { String objUuid = (objectUuid == null) ? objectId.toString() : objectUuid; - return ((BaseAsyncCreateCmd) asyncCmd).getResponse(jobId, objUuid); + return getBaseAsyncCreateResponse(jobId, (BaseAsyncCreateCmd)asyncCmd, objUuid); + } else { + SerializationContext.current().setUuidTranslation(true); + return getBaseAsyncResponse(jobId, asyncCmd); } - - SerializationContext.current().setUuidTranslation(true); - return ApiResponseSerializer.toSerializedString(asyncCmd.getResponse(jobId), asyncCmd.getResponseType()); } else { _dispatcher.dispatch(cmdObj, params); @@ -564,7 +588,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer if (job.getInstanceId() == null) { continue; } - String instanceUuid = ApiDBUtils.findJobInstanceUuid(job); + String instanceUuid = job.getUuid(); if (instanceUuid != null) { objectJobMap.put(instanceUuid, job); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java b/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java index f7a2c8c..756425f 100644 --- a/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java +++ b/server/src/com/cloud/api/query/dao/AsyncJobJoinDao.java @@ -16,12 +16,10 @@ // under the License. package com.cloud.api.query.dao; -import java.util.List; - import org.apache.cloudstack.api.response.AsyncJobResponse; +import org.apache.cloudstack.framework.jobs.AsyncJob; import com.cloud.api.query.vo.AsyncJobJoinVO; -import com.cloud.async.AsyncJob; import com.cloud.utils.db.GenericDao; public interface AsyncJobJoinDao extends GenericDao { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java b/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java index fb5695b..5090475 100644 --- a/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java +++ b/server/src/com/cloud/api/query/dao/AsyncJobJoinDaoImpl.java @@ -22,15 +22,15 @@ import java.util.List; import javax.ejb.Local; import org.apache.log4j.Logger; +import org.springframework.stereotype.Component; -import com.cloud.api.ApiSerializerHelper; -import com.cloud.api.SerializationContext; -import com.cloud.api.query.vo.AsyncJobJoinVO; -import com.cloud.async.AsyncJob; import org.apache.cloudstack.api.ResponseObject; import org.apache.cloudstack.api.response.AsyncJobResponse; -import org.springframework.stereotype.Component; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import com.cloud.api.ApiSerializerHelper; +import com.cloud.api.SerializationContext; +import com.cloud.api.query.vo.AsyncJobJoinVO; import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; @@ -40,7 +40,7 @@ import com.cloud.utils.db.SearchCriteria; public class AsyncJobJoinDaoImpl extends GenericDaoBase implements AsyncJobJoinDao { public static final Logger s_logger = Logger.getLogger(AsyncJobJoinDaoImpl.class); - private SearchBuilder jobIdSearch; + private final SearchBuilder jobIdSearch; protected AsyncJobJoinDaoImpl() { @@ -49,7 +49,7 @@ public class AsyncJobJoinDaoImpl extends GenericDaoBase im jobIdSearch.and("id", jobIdSearch.entity().getId(), SearchCriteria.Op.EQ); jobIdSearch.done(); - this._count = "select count(distinct id) from async_job_view WHERE "; + _count = "select count(distinct id) from async_job_view WHERE "; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobConstants.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobConstants.java b/server/src/com/cloud/async/AsyncJobConstants.java deleted file mode 100644 index 6081d0c..0000000 --- a/server/src/com/cloud/async/AsyncJobConstants.java +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - -public interface AsyncJobConstants { - public static final int STATUS_IN_PROGRESS = 0; - public static final int STATUS_SUCCEEDED = 1; - public static final int STATUS_FAILED = 2; - - public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher"; - public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread"; - - public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor"; - - // Although we may have detailed masks for each individual wakeup event, i.e. - // periodical timer, matched topic from message bus, it seems that we don't - // need to distinguish them to such level. Therefore, only one wakeup signal - // is defined - public static final int SIGNAL_MASK_WAKEUP = 1; -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java b/server/src/com/cloud/async/AsyncJobDispatcher.java deleted file mode 100644 index ed03d33..0000000 --- a/server/src/com/cloud/async/AsyncJobDispatcher.java +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - -import com.cloud.utils.component.Adapter; - -public interface AsyncJobDispatcher extends Adapter { - void runJob(AsyncJob job); -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobExecutionContext.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java index 988f4e6..e6efd03 100644 --- a/server/src/com/cloud/async/AsyncJobExecutionContext.java +++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java @@ -18,7 +18,13 @@ package com.cloud.async; import javax.inject.Inject; -import com.cloud.async.dao.AsyncJobJoinMapDao; +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.SyncQueueItem; +import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; + import com.cloud.exception.ConcurrentOperationException; import com.cloud.exception.InsufficientCapacityException; import com.cloud.exception.ResourceUnavailableException; @@ -26,7 +32,7 @@ import com.cloud.serializer.SerializerHelper; import com.cloud.utils.component.ComponentContext; public class AsyncJobExecutionContext { - private AsyncJob _job; + private AsyncJob _job; @Inject private AsyncJobManager _jobMgr; @Inject private AsyncJobJoinMapDao _joinMapDao; @@ -36,7 +42,7 @@ public class AsyncJobExecutionContext { public AsyncJobExecutionContext() { } - public AsyncJobExecutionContext(AsyncJob job) { + public AsyncJobExecutionContext(AsyncJob job) { _job = job; } @@ -48,7 +54,7 @@ public class AsyncJobExecutionContext { _job.setSyncSource(null); } - public AsyncJob getJob() { + public AsyncJob getJob() { if(_job == null) { _job = _jobMgr.getPseudoJob(); } @@ -56,7 +62,7 @@ public class AsyncJobExecutionContext { return _job; } - public void setJob(AsyncJob job) { + public void setJob(AsyncJob job) { _job = job; } @@ -75,7 +81,7 @@ public class AsyncJobExecutionContext { _jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId); } - public void logJobJournal(AsyncJob.JournalType journalType, String + public void logJobJournal(AsyncJob.JournalType journalType, String journalText, String journalObjJson) { assert(_job != null); _jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson); @@ -89,14 +95,14 @@ public class AsyncJobExecutionContext { public void joinJob(long joinJobId, String wakeupHandler, String wakeupDispatcher, String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) { assert(_job != null); - _jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus, + _jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus, wakeupIntervalInMilliSeconds, timeoutInMilliSeconds); } // // check failure exception before we disjoin the worker job // TODO : it is ugly and this will become unnecessary after we switch to full-async mode - // + // public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException { assert(_job != null); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/cbdc4063/server/src/com/cloud/async/AsyncJobMBean.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobMBean.java b/server/src/com/cloud/async/AsyncJobMBean.java deleted file mode 100644 index 15d6595..0000000 --- a/server/src/com/cloud/async/AsyncJobMBean.java +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - -public interface AsyncJobMBean { - public long getAccountId(); - public long getUserId(); - public String getCmd(); - public String getCmdInfo(); - public String getStatus(); - public int getProcessStatus(); - public int getResultCode(); - public String getResult(); - public String getInstanceType(); - public String getInstanceId(); - public String getInitMsid(); - public String getCreateTime(); - public String getLastUpdateTime(); - public String getLastPollTime(); - public String getSyncQueueId(); - public String getSyncQueueContentType(); - public String getSyncQueueContentId(); -}