Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7189FF35 for ; Wed, 5 Nov 2014 10:45:04 +0000 (UTC) Received: (qmail 49999 invoked by uid 500); 5 Nov 2014 10:44:58 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 49948 invoked by uid 500); 5 Nov 2014 10:44:58 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 49795 invoked by uid 99); 5 Nov 2014 10:44:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Nov 2014 10:44:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 008F4A0A27B; Wed, 5 Nov 2014 10:44:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rajani@apache.org To: commits@cloudstack.apache.org Date: Wed, 05 Nov 2014 10:45:29 -0000 Message-Id: In-Reply-To: <05c6863fb3b34541b89505ec934bced5@git.apache.org> References: <05c6863fb3b34541b89505ec934bced5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] git commit: updated refs/heads/master to 4c5f792 CLOUDSTACK-7833: VM Async work jobs log "Was unable to find lock for the key vm_instance" errors as warning Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/07ba078e Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/07ba078e Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/07ba078e Branch: refs/heads/master Commit: 07ba078ee6f30b02dff6e1942e5f3da3121f4ea1 Parents: 6830cbc Author: Min Chen Authored: Thu Oct 30 18:14:15 2014 -0700 Committer: Min Chen Committed: Mon Nov 3 11:19:06 2014 -0800 ---------------------------------------------------------------------- .../com/cloud/vm/VirtualMachineManagerImpl.java | 773 ++++++++----------- .../jobs/impl/AsyncJobManagerImpl.java | 67 +- .../com/cloud/storage/VolumeApiServiceImpl.java | 247 ++---- .../vm/snapshot/VMSnapshotManagerImpl.java | 195 ++--- 4 files changed, 486 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/07ba078e/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index ec4aa1d..4b1597a 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -183,7 +183,6 @@ import com.cloud.utils.db.DB; import com.cloud.utils.db.EntityManager; import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.Transaction; -import com.cloud.utils.db.TransactionCallback; import com.cloud.utils.db.TransactionCallbackWithException; import com.cloud.utils.db.TransactionCallbackWithExceptionNoReturn; import com.cloud.utils.db.TransactionLegacy; @@ -3936,55 +3935,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; + VmWorkJobVO workJob = null; + List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, + vm.getId(), VmWorkStart.class.getName()); - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, - vm.getId(), VmWorkStart.class.getName()); - - if (pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkStart.class.getName()); - - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - - // save work context info (there are some duplications) - VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER); - workInfo.setPlan(planToDeploy); - if (planner != null) { - workInfo.setDeploymentPlanner(planner.getName()); - } - workInfo.setParams(params); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + if (pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkStart.class.getName()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + + // save work context info (there are some duplications) + VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER); + workInfo.setPlan(planToDeploy); + workInfo.setParams(params); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmStateSyncOutcome((VmWorkJobVO)result[0], + return new VmStateSyncOutcome(workJob, VirtualMachine.PowerState.PowerOn, vm.getId(), null); } @@ -3995,51 +3977,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + vm.getType(), vm.getId(), + VmWorkStop.class.getName()); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - vm.getType(), vm.getId(), - VmWorkStop.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkStop.class.getName()); - - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setStep(VmWorkJobVO.Step.Prepare); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - - // save work context info (there are some duplications) - VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkStop.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + + // save work context info (there are some duplications) + VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmStateSyncOutcome((VmWorkJobVO)result[0], + return new VmStateSyncOutcome(workJob, VirtualMachine.PowerState.PowerOff, vm.getId(), null); } @@ -4052,50 +4020,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkReboot.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkReboot.class.getName()); - - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setStep(VmWorkJobVO.Step.Prepare); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - - // save work context info (there are some duplications) - VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkReboot.class.getName()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkReboot.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setStep(VmWorkJobVO.Step.Prepare); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + + // save work context info (there are some duplications) + VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } @@ -4106,50 +4061,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrate.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrate.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrate.class.getName()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrate.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - // save work context info (there are some duplications) - VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + // save work context info (there are some duplications) + VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmStateSyncOutcome((VmWorkJobVO)result[0], + return new VmStateSyncOutcome(workJob, VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); } @@ -4160,49 +4102,36 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrateAway.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrateAway.class.getName()); - - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - - // save work context info (there are some duplications) - VmWorkMigrateAway workInfo = new VmWorkMigrateAway(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateAway.class.getName()); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { + workJob = new VmWorkJobVO(context.getContextId()); - return new VmStateSyncOutcome((VmWorkJobVO)result[0], - VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrateAway.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + + // save work context info (there are some duplications) + VmWorkMigrateAway workInfo = new VmWorkMigrateAway(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + } + + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); + + return new VmStateSyncOutcome(workJob, VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId()); } public Outcome migrateVmWithStorageThroughJobQueue( @@ -4215,51 +4144,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrateWithStorage.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateWithStorage.class.getName()); - workJob = new VmWorkJobVO(context.getContextId()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrate.class.getName()); + workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrate.class.getName()); - // save work context info (there are some duplications) - VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, destHostId, volumeToPool); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + // save work context info (there are some duplications) + VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, destHostId, volumeToPool); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmStateSyncOutcome((VmWorkJobVO)result[0], + return new VmStateSyncOutcome(workJob, VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId); } @@ -4272,52 +4187,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkMigrateForScale.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - - workJob = new VmWorkJobVO(context.getContextId()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkMigrateForScale.class.getName()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrate.class.getName()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob = new VmWorkJobVO(context.getContextId()); - // save work context info (there are some duplications) - VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest, newSvcOfferingId); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrate.class.getName()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + // save work context info (there are some duplications) + VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest, newSvcOfferingId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } public Outcome migrateVmStorageThroughJobQueue( @@ -4329,52 +4229,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkStorageMigration.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { - - workJob = new VmWorkJobVO(context.getContextId()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkStorageMigration.class.getName()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkStorageMigration.class.getName()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob = new VmWorkJobVO(context.getContextId()); - // save work context info (there are some duplications) - VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId()); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkStorageMigration.class.getName()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + // save work context info (there are some duplications) + VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId()); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } public Outcome addVmToNetworkThroughJobQueue( @@ -4384,52 +4269,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkAddVmToNetwork.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkAddVmToNetwork.class.getName()); - workJob = new VmWorkJobVO(context.getContextId()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); + workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); - // save work context info (there are some duplications) - VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + // save work context info (there are some duplications) + VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } public Outcome removeNicFromVmThroughJobQueue( @@ -4439,51 +4309,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkRemoveNicFromVm.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkRemoveNicFromVm.class.getName()); - workJob = new VmWorkJobVO(context.getContextId()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); + workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkRemoveNicFromVm.class.getName()); - // save work context info (there are some duplications) - VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic.getId()); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + // save work context info (there are some duplications) + VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic.getId()); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } public Outcome removeVmFromNetworkThroughJobQueue( @@ -4493,51 +4349,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final User user = context.getCallingUser(); final Account account = context.getCallingAccount(); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkRemoveVmFromNetwork.class.getName()); + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkRemoveVmFromNetwork.class.getName()); - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob = new VmWorkJobVO(context.getContextId()); + workJob = new VmWorkJobVO(context.getContextId()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - // save work context info (there are some duplications) - VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, broadcastUri); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + // save work context info (there are some duplications) + VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network, broadcastUri); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } public Outcome reconfigureVmThroughJobQueue( @@ -4549,51 +4392,37 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - - _vmDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - List pendingWorkJobs = _workJobDao.listPendingWorkJobs( - VirtualMachine.Type.Instance, vm.getId(), - VmWorkReconfigure.class.getName()); - - VmWorkJobVO workJob = null; - if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert (pendingWorkJobs.size() == 1); - workJob = pendingWorkJobs.get(0); - } else { + List pendingWorkJobs = _workJobDao.listPendingWorkJobs( + VirtualMachine.Type.Instance, vm.getId(), + VmWorkReconfigure.class.getName()); - workJob = new VmWorkJobVO(context.getContextId()); + VmWorkJobVO workJob = null; + if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { + assert (pendingWorkJobs.size() == 1); + workJob = pendingWorkJobs.get(0); + } else { - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkReconfigure.class.getName()); + workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkReconfigure.class.getName()); - // save work context info (there are some duplications) - VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, newServiceOffering.getId(), reconfiguringOnExistingHost); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - } - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + // save work context info (there are some duplications) + VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, newServiceOffering.getId(), reconfiguringOnExistingHost); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId()); + return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } @ReflectionUse http://git-wip-us.apache.org/repos/asf/cloudstack/blob/07ba078e/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index aab1683..548182a 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -79,6 +79,7 @@ import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.ExceptionUtil; import com.cloud.utils.mgmt.JmxUtil; +import com.cloud.vm.dao.VMInstanceDao; public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener, Configurable { // Advanced @@ -86,6 +87,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, "Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global); private static final ConfigKey JobCancelThresholdMinutes = new ConfigKey("Advanced", Long.class, "job.cancel.threshold.minutes", "60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global); + private static final ConfigKey VmJobLockTimeout = new ConfigKey("Advanced", + Integer.class, "vm.job.lock.timeout", "1800", + "Time in seconds to wait in acquiring lock to submit a vm worker job", false); private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); @@ -111,6 +115,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private MessageBus _messageBus; @Inject private AsyncJobMonitor _jobMonitor; + @Inject + private VMInstanceDao _vmInstanceDao; private volatile long _executionRunNumber = 1; @@ -125,7 +131,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Override public ConfigKey[] getConfigKeys() { - return new ConfigKey[] {JobExpireMinutes, JobCancelThresholdMinutes}; + return new ConfigKey[] {JobExpireMinutes, JobCancelThresholdMinutes, VmJobLockTimeout}; } @Override @@ -185,18 +191,27 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @SuppressWarnings("rawtypes") final GenericDao dao = GenericDaoBase.getDao(job.getClass()); - return Transaction.execute(new TransactionCallback() { - @Override - public Long doInTransaction(TransactionStatus status) { - job.setInitMsid(getMsid()); - dao.persist(job); + publishOnEventBus(job, "submit"); - publishOnEventBus(job, "submit"); - syncAsyncJobExecution(job, syncObjType, syncObjId, 1); + if (!_vmInstanceDao.lockInLockTable(String.valueOf(syncObjId), VmJobLockTimeout.value())){ + throw new CloudRuntimeException("Failed to acquire lock in submitting async job: " + job.getCmd() + " with timeout value = " + VmJobLockTimeout.value()); + } - return job.getId(); - } - }); + try { + // lock is acquired + return Transaction.execute(new TransactionCallback() { + @Override + public Long doInTransaction(TransactionStatus status) { + job.setInitMsid(getMsid()); + dao.persist(job); + + syncAsyncJobExecution(job, syncObjType, syncObjId, 1); + return job.getId(); + } + }); + } finally { + _vmInstanceDao.unlockFromLockTable(String.valueOf(syncObjId)); + } } catch (Exception e) { String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception."; s_logger.warn(errMsg, e); @@ -239,13 +254,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out if (s_logger.isDebugEnabled()) { - s_logger.debug("Wake up jobs related to job- " + jobId); + s_logger.debug("Wake up jobs related to job-" + jobId); } List wakeupList = Transaction.execute(new TransactionCallback>() { @Override public List doInTransaction(TransactionStatus status) { if (s_logger.isDebugEnabled()) { - s_logger.debug("Update db status for job- " + jobId); + s_logger.debug("Update db status for job-" + jobId); } job.setCompleteMsid(getMsid()); job.setStatus(jobStatus); @@ -262,15 +277,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, _jobDao.update(jobId, job); if (s_logger.isDebugEnabled()) { - s_logger.debug("Wake up jobs joined with job- " + jobId + " and disjoin all subjobs created from job- " + jobId); + s_logger.debug("Wake up jobs joined with job-" + jobId + " and disjoin all subjobs created from job- " + jobId); } List wakeupList = wakeupByJoinedJobCompletion(jobId); _joinMapDao.disjoinAllJobs(jobId); // purge the job sync item from queue - if (job.getSyncSource() != null) { - _queueMgr.purgeItem(job.getSyncSource().getId()); - } + _queueMgr.purgeAsyncJobQueueItemId(jobId); return wakeupList; } @@ -856,20 +869,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, final SearchCriteria jobsSC = JobIdsSearch.create("ids", ids); final SearchCriteria queueItemsSC = QueueJobIdsSearch.create("contentIds", ids); - Transaction.execute(new TransactionCallbackNoReturn() { - @Override - public void doInTransactionWithoutResult(TransactionStatus status) { - AsyncJobVO job = _jobDao.createForUpdate(); - job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP); - _jobDao.update(job, jobsSC); + AsyncJobVO job = _jobDao.createForUpdate(); + job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP); + _jobDao.update(job, jobsSC); - SyncQueueItemVO item = _queueItemDao.createForUpdate(); - item.setLastProcessNumber(null); - item.setLastProcessMsid(null); - _queueItemDao.update(item, queueItemsSC); - } - }); + SyncQueueItemVO item = _queueItemDao.createForUpdate(); + item.setLastProcessNumber(null); + item.setLastProcessMsid(null); + _queueItemDao.update(item, queueItemsSC); } + return _joinMapDao.findJobsToWake(joinedJobId); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/07ba078e/server/src/com/cloud/storage/VolumeApiServiceImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index 9f132c3..3184dca 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -2379,47 +2379,31 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic final VMInstanceVO vm = _vmInstanceDao.findById(vmId); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; + VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId()); - _vmInstanceDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - workJob = new VmWorkJobVO(context.getContextId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAttachVolume.class.getName()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkAttachVolume.class.getName()); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + // save work context info (there are some duplications) + VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, deviceId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - // save work context info (there are some duplications) - VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), - VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, deviceId); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + AsyncJobVO jobVo = _jobMgr.getAsyncJob(workJob.getId()); + s_logger.debug("New job " + workJob.getId() + ", result field: " + jobVo.getResult()); - AsyncJobVO jobVo = _jobMgr.getAsyncJob(workJob.getId()); - s_logger.debug("New job " + workJob.getId() + ", result field: " + jobVo.getResult()); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmInstanceDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); - - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - - return new VmJobVolumeOutcome((VmWorkJobVO)result[0], - volumeId); + return new VmJobVolumeOutcome(workJob, volumeId); } public Outcome detachVolumeFromVmThroughJobQueue(final Long vmId, final Long volumeId) { @@ -2430,44 +2414,28 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic final VMInstanceVO vm = _vmInstanceDao.findById(vmId); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; - - _vmInstanceDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkDetachVolume.class.getName()); + VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkDetachVolume.class.getName()); - // save work context info (there are some duplications) - VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), - VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + // save work context info (there are some duplications) + VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmInstanceDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVolumeOutcome((VmWorkJobVO)result[0], - volumeId); + return new VmJobVolumeOutcome(workJob, volumeId); } public Outcome resizeVolumeThroughJobQueue(final Long vmId, final long volumeId, @@ -2479,45 +2447,28 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic final VMInstanceVO vm = _vmInstanceDao.findById(vmId); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; - - _vmInstanceDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - - try { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkResizeVolume.class.getName()); + VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkResizeVolume.class.getName()); - // save work context info (there are some duplications) - VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), - VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, currentSize, newSize, newMinIops, newMaxIops, newServiceOfferingId, shrinkOk); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + // save work context info (there are some duplications) + VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, currentSize, newSize, newMinIops, newMaxIops, newServiceOfferingId, shrinkOk); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmInstanceDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobVolumeOutcome((VmWorkJobVO)result[0], - volumeId); + return new VmJobVolumeOutcome(workJob,volumeId); } public Outcome migrateVolumeThroughJobQueue(final Long vmId, final long volumeId, @@ -2529,44 +2480,28 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic final VMInstanceVO vm = _vmInstanceDao.findById(vmId); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; + VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId()); - _vmInstanceDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - workJob = new VmWorkJobVO(context.getContextId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkMigrateVolume.class.getName()); - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkMigrateVolume.class.getName()); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + // save work context info (there are some duplications) + VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, destPoolId, liveMigrate); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - // save work context info (there are some duplications) - VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), - VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, destPoolId, liveMigrate); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmInstanceDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); - - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); - - return new VmJobVolumeOutcome((VmWorkJobVO)result[0], - volumeId); + return new VmJobVolumeOutcome(workJob,volumeId); } public Outcome takeVolumeSnapshotThroughJobQueue(final Long vmId, final Long volumeId, @@ -2578,45 +2513,29 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic final VMInstanceVO vm = _vmInstanceDao.findById(vmId); - Object[] result = Transaction.execute(new TransactionCallback() { - @Override - public Object[] doInTransaction(TransactionStatus status) { - VmWorkJobVO workJob = null; - - _vmInstanceDao.lockInLockTable(String.valueOf(vm.getId()), Integer.MAX_VALUE); - try { - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkTakeVolumeSnapshot.class.getName()); + VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId()); - workJob.setAccountId(callingAccount.getId()); - workJob.setUserId(callingUser.getId()); - workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkTakeVolumeSnapshot.class.getName()); - // save work context info (there are some duplications) - VmWorkTakeVolumeSnapshot workInfo = new VmWorkTakeVolumeSnapshot( - callingUser.getId(), accountId != null ? accountId : callingAccount.getId(), vm.getId(), - VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, policyId, snapshotId, quiesceVm); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + workJob.setAccountId(callingAccount.getId()); + workJob.setUserId(callingUser.getId()); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + // save work context info (there are some duplications) + VmWorkTakeVolumeSnapshot workInfo = new VmWorkTakeVolumeSnapshot( + callingUser.getId(), accountId != null ? accountId : callingAccount.getId(), vm.getId(), + VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, policyId, snapshotId, quiesceVm); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - return new Object[] {workJob, new Long(workJob.getId())}; - } finally { - _vmInstanceDao.unlockFromLockTable(String.valueOf(vm.getId())); - } - } - }); + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); - final long jobId = (Long)result[1]; - AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId); + AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); - return new VmJobSnapshotOutcome((VmWorkJobVO)result[0], - snapshotId); + return new VmJobSnapshotOutcome(workJob,snapshotId); } @ReflectionUse