cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject [08/33] git commit: updated refs/heads/master to 90262a8
Date Fri, 28 Feb 2014 23:37:28 GMT
CLOUDSTACK-5358: Bring back concurrency control in sync-queue management


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/441be43b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/441be43b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/441be43b

Branch: refs/heads/master
Commit: 441be43b8c4be79935c32d4711a10739d7a6eae9
Parents: d5dc6aa
Author: Kelven Yang <kelveny@gmail.com>
Authored: Mon Jan 20 16:53:17 2014 -0800
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Fri Feb 28 15:35:57 2014 -0800

----------------------------------------------------------------------
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 81 ++++++++++++--------
 .../com/cloud/vm/VmWorkStorageMigration.java    | 12 ++-
 .../framework/jobs/dao/SyncQueueItemDao.java    |  1 +
 .../jobs/dao/SyncQueueItemDaoImpl.java          | 35 +++++++--
 .../jobs/impl/AsyncJobManagerImpl.java          | 19 +----
 .../jobs/impl/SyncQueueManagerImpl.java         | 21 +++--
 6 files changed, 93 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/441be43b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index d19fc38..0ef275e 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -47,6 +47,7 @@ import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
 import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
 import org.apache.cloudstack.framework.config.ConfigDepot;
 import org.apache.cloudstack.framework.config.ConfigKey;
@@ -769,7 +770,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params,
planToDeploy);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -1317,7 +1318,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -1626,7 +1627,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid,
destPool);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -1718,7 +1719,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId,
dest);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -2001,7 +2002,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid,
srcHostId, destHostId, volumeToPool);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -2014,7 +2015,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
                     throw (ResourceUnavailableException)jobException;
                 else if (jobException instanceof ConcurrentOperationException)
                     throw (ConcurrentOperationException)jobException;
-            }
+           }
         }
     }
 
@@ -2296,7 +2297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -2994,10 +2995,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             return;
         }
 
-        if (s_logger.isDebugEnabled())
+        if(s_logger.isDebugEnabled())
             s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
 
-        if (VmJobEnabled.value()) {
+        if(VmJobEnabled.value()) {
             _syncMgr.resetHostSyncState(agent.getId());
         }
 
@@ -3589,7 +3590,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid,
srcHostId, dest, oldSvcOfferingId);
 
             try {
-                outcome.get();
+                VirtualMachine vm = outcome.get();
             } catch (InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (java.util.concurrent.ExecutionException e) {
@@ -3793,7 +3794,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
     }
 
     public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext
context, DeployDestination dest) throws ConcurrentOperationException,
-    ResourceUnavailableException {
+            ResourceUnavailableException {
 
         boolean result = true;
         VMInstanceVO router = _vmDao.findById(vm.getId());
@@ -3828,7 +3829,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
     @Override
     public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
             boolean reconfiguringOnExistingHost)
-                    throws ResourceUnavailableException, InsufficientServerCapacityException,
ConcurrentOperationException {
+            throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException
{
 
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER))
{
@@ -3974,8 +3975,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
                     handlePowerOffReportWithNoPendingJobsOnVM(vm);
                     break;
 
-                    // PowerUnknown shouldn't be reported, it is a derived
-                    // VM power state from host state (host un-reachable)
+                // PowerUnknown shouldn't be reported, it is a derived
+                // VM power state from host state (host un-reachable)
                 case PowerUnknown:
                 default:
                     assert (false);
@@ -4009,7 +4010,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             // we need to alert admin or user about this risky state transition
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(),
vm.getPodIdToDeployIn(),
                     VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
-                    + ") state is sync-ed (Starting -> Running) from out-of-context transition.
VM network environment may need to be reset");
+                            + ") state is sync-ed (Starting -> Running) from out-of-context
transition. VM network environment may need to be reset");
             break;
 
         case Running:
@@ -4031,7 +4032,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             }
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(),
vm.getPodIdToDeployIn(),
                     VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
+ ") state is sync-ed (" + vm.getState()
-                    + " -> Running) from out-of-context transition. VM network environment
may need to be reset");
+                            + " -> Running) from out-of-context transition. VM network
environment may need to be reset");
             break;
 
         case Destroyed:
@@ -4074,7 +4075,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             }
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(),
vm.getPodIdToDeployIn(),
                     VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
+ ") state is sync-ed (" + vm.getState()
-                    + " -> Stopped) from out-of-context transition.");
+                            + " -> Stopped) from out-of-context transition.");
             // TODO: we need to forcely release all resource allocation
             break;
 
@@ -4101,7 +4102,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
         // however, if VM is missing from the host report (it may happen in out of band changes
         // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync
logic
         //
-        // Therefor, we will scan thoses VMs on UP host based on last update timestamp, if
the host is UP
+        // Therefore, we will scan thoses VMs on UP host based on last update timestamp,
if the host is UP
         // and a VM stalls for status update, we will consider them to be powered off
         // (which is relatively safe to do so)
 
@@ -4134,7 +4135,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             // We now only alert administrator about this situation
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(),
vm.getPodIdToDeployIn(),
                     VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
+ ") is stuck in " + vm.getState()
-                    + " state and its host is unreachable for too long");
+                            + " state and its host is unreachable for too long");
         }
     }
 
@@ -4332,7 +4333,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
 
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4383,7 +4384,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
 
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4436,7 +4437,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
 
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4458,6 +4459,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkMigrate.class.getName());
@@ -4485,7 +4488,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4540,7 +4543,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4564,6 +4567,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkMigrateForScale.class.getName());
@@ -4593,7 +4598,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
 
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4616,6 +4621,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkStorageMigration.class.getName());
@@ -4639,13 +4646,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     // save work context info (there are some duplications)
                     VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(),
account.getId(), vm.getId(),
-                            VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool);
+                            VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId());
                     workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
 
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4666,6 +4673,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkAddVmToNetwork.class.getName());
@@ -4694,7 +4703,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4715,6 +4724,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkRemoveNicFromVm.class.getName());
@@ -4743,7 +4754,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4764,6 +4775,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkRemoveVmFromNetwork.class.getName());
@@ -4792,7 +4805,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4815,6 +4828,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             @Override
             public Object[] doInTransaction(TransactionStatus status) {
 
+                _vmDao.lockRow(vm.getId(), true);
+
                 List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
                         VirtualMachine.Type.Instance, vm.getId(),
                         VmWorkReconfigure.class.getName());
@@ -4843,7 +4858,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
 
                     _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
                 }
-                return new Object[] {workJob, workJob.getId()};
+                return new Object[] {workJob, new Long(workJob.getId())};
             }
         });
 
@@ -4980,7 +4995,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements
VirtualMac
             s_logger.info("Unable to find vm " + work.getVmId());
         }
         assert (vm != null);
-        orchestrateStorageMigration(vm.getUuid(), work.getDestStoragePool());
+        StoragePool pool = (PrimaryDataStoreInfo)dataStoreMgr.getPrimaryDataStore(work.getDestStoragePoolId());
+        orchestrateStorageMigration(vm.getUuid(), pool);
+
         return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/441be43b/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java b/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java
index 2b2f8e8..1d7d55e 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java
@@ -16,20 +16,18 @@
 // under the License.
 package com.cloud.vm;
 
-import com.cloud.storage.StoragePool;
-
 public class VmWorkStorageMigration extends VmWork {
     private static final long serialVersionUID = -8677979691741157474L;
 
-    StoragePool destPool;
+    Long destPoolId;
 
-    public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName,
StoragePool destPool) {
+    public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName,
Long destPoolId) {
         super(userId, accountId, vmId, handlerName);
 
-        this.destPool = destPool;
+        this.destPoolId = destPoolId;
     }
 
-    public StoragePool getDestStoragePool() {
-        return destPool;
+    public Long getDestStoragePoolId() {
+        return destPoolId;
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/441be43b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
index f5ac7b1..7b6eed7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
@@ -24,6 +24,7 @@ import com.cloud.utils.db.GenericDao;
 
 public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
     public SyncQueueItemVO getNextQueueItem(long queueId);
+    public int getActiveQueueItemCount(long queueId);
 
     public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/441be43b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 7363763..41f1419 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.GenericSearchBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Func;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.TransactionLegacy;
 
@@ -43,6 +44,7 @@ import com.cloud.utils.db.TransactionLegacy;
 public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements
SyncQueueItemDao {
     private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
     final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
+    final GenericSearchBuilder<SyncQueueItemVO, Integer> queueActiveItemSearch;
 
     public SyncQueueItemDaoImpl() {
         super();
@@ -51,6 +53,12 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
         queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
         queueIdSearch.selectFields(queueIdSearch.entity().getId());
         queueIdSearch.done();
+
+        queueActiveItemSearch = createSearchBuilder(Integer.class);
+        queueActiveItemSearch.and("queueId", queueActiveItemSearch.entity().getQueueId(),
Op.EQ);
+        queueActiveItemSearch.and("processNumber", queueActiveItemSearch.entity().getLastProcessNumber(),
Op.NNULL);
+        queueActiveItemSearch.select(null, Func.COUNT, queueActiveItemSearch.entity().getId());
+        queueActiveItemSearch.done();
     }
 
     @Override
@@ -66,19 +74,31 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
 
         Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
         List<SyncQueueItemVO> l = listBy(sc, filter);
-        if (l != null && l.size() > 0)
+        if(l != null && l.size() > 0)
             return l.get(0);
 
         return null;
     }
 
     @Override
+    public int getActiveQueueItemCount(long queueId) {
+        SearchCriteria<Integer> sc = queueActiveItemSearch.create();
+        sc.setParameters("queueId", queueId);
+
+        List<Integer> count = customSearch(sc, null);
+        return count.get(0);
+    }
+
+    @Override
     public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
         List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
 
-        String sql =
-            "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + " FROM
sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id "
-                + " WHERE i.queue_proc_number IS NULL " + " GROUP BY q.id " + " ORDER BY
i.id " + " LIMIT 0, ?";
+        String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created "
+
+                " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
+                     " WHERE i.queue_proc_number IS NULL " +
+                " GROUP BY q.id " +
+                " ORDER BY i.id " +
+                " LIMIT 0, ?";
 
         TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
@@ -86,7 +106,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
             pstmt = txn.prepareAutoCloseStatement(sql);
             pstmt.setInt(1, maxItems);
             ResultSet rs = pstmt.executeQuery();
-            while (rs.next()) {
+            while(rs.next()) {
                 SyncQueueItemVO item = new SyncQueueItemVO();
                 item.setId(rs.getLong(1));
                 item.setQueueId(rs.getLong(2));
@@ -106,7 +126,8 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
     @Override
     public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive)
{
         SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
-        sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(), SearchCriteria.Op.EQ);
+        sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
+                SearchCriteria.Op.EQ);
         sb.done();
 
         SearchCriteria<SyncQueueItemVO> sc = sb.create();
@@ -134,7 +155,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO,
Long>
         SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
         sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
 
-        if (exclusive)
+        if(exclusive)
             return lockRows(sc, null, true);
         return listBy(sc, null);
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/441be43b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index c3ac0e6..86e62dc 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -24,7 +24,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -365,23 +364,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         }
 
         SyncQueueVO queue = null;
-
-        // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased
rollbacks
-        // we retry five times until we throw an exception
-        Random random = new Random();
-
-        for (int i = 0; i < 5; i++) {
-            queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType,
job.getId(), queueSizeLimit);
-            if (queue != null) {
-                break;
-            }
-
-            try {
-                Thread.sleep(1000 + random.nextInt(5000));
-            } catch (InterruptedException e) {
-            }
-        }
-
+        queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType,
job.getId(), queueSizeLimit);
         if (queue == null)
             throw new CloudRuntimeException("Unable to insert queue item into database, DB
is full?");
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/441be43b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
index 09974a1..d8e2674 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -242,18 +242,15 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
     }
 
     private boolean queueReadyToProcess(SyncQueueVO queueVO) {
-        return true;
-
-        //
-        // TODO
-        //
-        // Need to disable concurrency disable at queue level due to the need to support
-        // job wake-up dispatching task
-        //
-        // Concurrency control is better done at higher level and leave the job scheduling/serializing
simpler
-        //
-
-        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+        int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId());
+        if (nActiveItems < queueVO.getQueueSizeLimit())
+            return true;
+
+        if (s_logger.isDebugEnabled())
+            s_logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId()
+                    + "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId()
+                    + ") is reaching concurrency limit " + queueVO.getQueueSizeLimit());
+        return false;
     }
 
     @Override


Mime
View raw message