cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [1/3] further refactored jobs
Date Tue, 04 Jun 2013 22:29:54 GMT
Updated Branches:
  refs/heads/vmsync 51f533e97 -> bd0c239f6


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
new file mode 100644
index 0000000..95f01e5
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMBeanImpl.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.TimeZone;
+
+import javax.management.StandardMBean;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJobMBean;
+
+import com.cloud.utils.DateUtil;
+
+public class AsyncJobMBeanImpl extends StandardMBean implements AsyncJobMBean {
+	private AsyncJob _job;
+	
+	public AsyncJobMBeanImpl(AsyncJob job) {
+		super(AsyncJobMBean.class, false);
+		
+		_job = job;
+	}
+	
+	public long getAccountId() {
+		return _job.getAccountId();
+	}
+	
+	public long getUserId() {
+		return _job.getUserId();
+	}
+	
+	public String getCmd() {
+		return _job.getCmd();
+	}
+	
+	public String getCmdInfo() {
+		return _job.getCmdInfo();
+	}
+	
+	public String getStatus() {
+		int jobStatus = _job.getStatus();
+		switch(jobStatus) {
+		case AsyncJobConstants.STATUS_SUCCEEDED :
+			return "Completed";
+		
+		case AsyncJobConstants.STATUS_IN_PROGRESS:
+			return "In preogress";
+			
+		case AsyncJobConstants.STATUS_FAILED:
+			return "failed";
+		}
+		
+		return "Unknow";
+	}
+	
+	public int getProcessStatus() {
+		return _job.getProcessStatus();
+	}
+	
+	public int getResultCode() {
+		return _job.getResultCode();
+	}
+	
+	public String getResult() {
+		return _job.getResult();
+	}
+	
+	public String getInstanceType() {
+		if(_job.getInstanceType() != null)
+			return _job.getInstanceType().toString();
+		return "N/A";
+	}
+	
+	public String getInstanceId() {
+		if(_job.getInstanceId() != null)
+			return String.valueOf(_job.getInstanceId());
+		return "N/A";
+	}
+	
+	public String getInitMsid() {
+		if(_job.getInitMsid() != null) {
+			return String.valueOf(_job.getInitMsid());
+		}
+		return "N/A";
+	}
+	
+	public String getCreateTime() {
+		Date time = _job.getCreated();
+		if(time != null)
+			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+		return "N/A";
+	}
+	
+	public String getLastUpdateTime() {
+		Date time = _job.getLastUpdated();
+		if(time != null)
+			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+		return "N/A";
+	}
+	
+	public String getLastPollTime() {
+		Date time = _job.getLastPolled();
+	
+		if(time != null)
+			return DateUtil.getDateDisplayString(TimeZone.getDefault(), time);
+		return "N/A";
+	}
+	
+	public String getSyncQueueId() {
+		SyncQueueItem item = _job.getSyncSource();
+		if(item != null && item.getQueueId() != null) {
+			return String.valueOf(item.getQueueId());
+		}
+		return "N/A";
+	}
+	
+	public String getSyncQueueContentType() {
+		SyncQueueItem item = _job.getSyncSource();
+		if(item != null) {
+			return item.getContentType();
+		}
+		return "N/A";
+	}
+	
+	public String getSyncQueueContentId() {
+		SyncQueueItem item = _job.getSyncSource();
+		if(item != null && item.getContentId() != null) {
+			return String.valueOf(item.getContentId());
+		}
+		return "N/A";
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
new file mode 100644
index 0000000..7a11195
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
+import org.apache.cloudstack.framework.jobs.AsyncJob.Topics;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
+
+import com.cloud.utils.component.ManagerBase;
+
+public class AsyncJobMonitor extends ManagerBase {
+    public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
+    
+    @Inject private MessageBus _messageBus;
+	
+	private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
+	private final Timer _timer = new Timer();
+	
+	private volatile int _activePoolThreads = 0;
+	private volatile int _activeInplaceThreads = 0;
+	
+	// configuration
+	private long _inactivityCheckIntervalMs = 60000;
+	private long _inactivityWarningThresholdMs = 90000;
+	
+	public AsyncJobMonitor() {
+	}
+	
+	public long getInactivityCheckIntervalMs() {
+		return _inactivityCheckIntervalMs;
+	}
+	
+	public void setInactivityCheckIntervalMs(long intervalMs) {
+		_inactivityCheckIntervalMs = intervalMs;
+	}
+	
+	public long getInactivityWarningThresholdMs() {
+		return _inactivityWarningThresholdMs;
+	}
+	
+	public void setInactivityWarningThresholdMs(long thresholdMs) {
+		_inactivityWarningThresholdMs = thresholdMs;
+	}
+	
+    @MessageHandler(topic = AsyncJob.Topics.JOB_HEARTBEAT)
+	public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
+		if(args != null && args instanceof Long) {
+			synchronized(this) {
+				ActiveTaskRecord record = _activeTasks.get(args);
+				if(record != null) {
+					record.updateJobHeartbeatTick();
+				}
+			}
+		}
+	}
+	
+	private void heartbeat() {
+		synchronized(this) {
+			for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
+				if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
+					s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+						+ entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
+				}
+			}
+		}
+	}
+	
+	@Override
+	public boolean configure(String name, Map<String, Object> params)
+			throws ConfigurationException {
+		
+        _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
+		_timer.scheduleAtFixedRate(new TimerTask() {
+
+			@Override
+			public void run() {
+				heartbeat();
+			}
+			
+		}, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
+		return true;
+	}
+	
+	public void registerActiveTask(long jobId) {
+		synchronized(this) {
+			assert(_activeTasks.get(jobId) == null);
+			
+			long threadId = Thread.currentThread().getId();
+			boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobConstants.JOB_POOL_THREAD_PREFIX);
+			ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
+			_activeTasks.put(jobId, record);
+			if(fromPoolThread)
+				_activePoolThreads++;
+			else
+				_activeInplaceThreads++;
+		}
+	}
+	
+	public void unregisterActiveTask(long jobId) {
+		synchronized(this) {
+			ActiveTaskRecord record = _activeTasks.get(jobId);
+			assert(record != null);
+			if(record != null) {
+				if(record.isPoolThread())
+					_activePoolThreads--;
+				else
+					_activeInplaceThreads--;
+				
+				_activeTasks.remove(jobId);
+			}
+		}
+	}
+	
+	public int getActivePoolThreads() {
+		return _activePoolThreads;
+	}
+	
+	public int getActiveInplaceThread() {
+		return _activeInplaceThreads;
+	}
+	
+	private static class ActiveTaskRecord {
+		long _jobId;
+		long _threadId;
+		boolean _fromPoolThread;
+		long _jobLastHeartbeatTick;
+		
+		public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
+			_threadId = threadId;
+			_jobId = jobId;
+			_fromPoolThread = fromPoolThread;
+			_jobLastHeartbeatTick = System.currentTimeMillis();
+		}
+		
+		public long getThreadId() {
+			return _threadId;
+		}
+		
+		public long getJobId() {
+			return _jobId;
+		}
+		
+		public boolean isPoolThread() {
+			return _fromPoolThread;
+		}
+		
+		public void updateJobHeartbeatTick() {
+			_jobLastHeartbeatTick = System.currentTimeMillis();
+		}
+		
+		public long millisSinceLastJobHeartbeat() {
+			return System.currentTimeMillis() - _jobLastHeartbeatTick;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
new file mode 100644
index 0000000..f2ea4ac
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.UUID;
+
+import javax.persistence.Column;
+import javax.persistence.DiscriminatorColumn;
+import javax.persistence.DiscriminatorType;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+import javax.persistence.Transient;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.jobs.Job;
+
+import com.cloud.utils.UuidUtils;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job")
+@Inheritance(strategy=InheritanceType.JOINED)
+@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
+public class AsyncJobVO implements AsyncJob, Job {
+	
+	@Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+	
+    @Column(name="job_type", length=32)
+    protected String type;
+    
+    @Column(name="job_dispatcher", length=64)
+    protected String dispatcher;
+    
+    @Column(name="job_pending_signals")
+    protected int pendingSignals;
+    
+    @Column(name="user_id")
+    private long userId;
+    
+    @Column(name="account_id")
+    private long accountId;
+    
+	@Column(name="job_cmd")
+    private String cmd;
+
+	@Column(name="job_cmd_ver")
+    private int cmdVersion;
+	
+    @Column(name="job_cmd_info", length=65535)
+    private String cmdInfo;
+  
+    @Column(name="job_status")
+    private int status;
+    
+    @Column(name="job_process_status")
+    private int processStatus;
+    
+    @Column(name="job_result_code")
+    private int resultCode;
+    
+    @Column(name="job_result", length=65535)
+    private String result;
+    
+    @Column(name="instance_type", length=64)
+    private String instanceType;
+    
+	@Column(name="instance_id", length=64)
+    private Long instanceId;
+    
+    @Column(name="job_init_msid")
+    private Long initMsid;
+
+    @Column(name="job_complete_msid")
+    private Long completeMsid;
+    
+    @Column(name="job_executing_msid")
+    private Long executingMsid;
+
+    @Column(name=GenericDao.CREATED_COLUMN)
+    private Date created;
+    
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+    
+    @Column(name="last_polled")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastPolled;
+    
+    @Column(name=GenericDao.REMOVED_COLUMN)
+    private Date removed;
+    
+    @Column(name="uuid")
+    private String uuid;
+
+    @Transient
+    private SyncQueueItem syncSource = null;
+
+    public AsyncJobVO() {
+        uuid = UUID.randomUUID().toString();
+    }
+
+    public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, String instanceType) {
+		this.userId = userId;
+		this.accountId = accountId;
+		this.cmd = cmd;
+		this.cmdInfo = cmdInfo;
+	    uuid = UUID.randomUUID().toString();
+	    this.instanceId = instanceId;
+	    this.instanceType = instanceType;
+    }
+
+    @Override
+    public long getId() {
+		return id;
+	}
+
+	public void setId(Long id) {
+		this.id = id;
+	}
+	
+    @Override
+    public String getShortUuid() {
+        return UuidUtils.first(uuid);
+    }
+
+	@Override
+	public String getType() {
+		return type;
+	}
+	
+	public void setType(String type) {
+		this.type = type;
+	}
+	
+	@Override
+	public String getDispatcher() {
+		return dispatcher;
+	}
+	
+	public void setDispatcher(String dispatcher) {
+		this.dispatcher = dispatcher;
+	}
+	
+	@Override
+	public int getPendingSignals() {
+		return pendingSignals;
+	}
+	
+	public void setPendingSignals(int signals) {
+		pendingSignals = signals;
+	}
+
+	@Override
+    public long getUserId() {
+		return userId;
+	}
+
+	public void setUserId(long userId) {
+		this.userId = userId;
+	}
+
+	@Override
+    public long getAccountId() {
+		return accountId;
+	}
+
+	public void setAccountId(long accountId) {
+		this.accountId = accountId;
+	}
+
+	@Override
+    public String getCmd() {
+		return cmd;
+	}
+
+	public void setCmd(String cmd) {
+		this.cmd = cmd;
+	}
+	
+	@Override
+    public int getCmdVersion() {
+		return cmdVersion;
+	}
+	
+	public void setCmdVersion(int version) {
+		cmdVersion = version;
+	}
+
+	@Override
+    public String getCmdInfo() {
+		return cmdInfo;
+	}
+
+	public void setCmdInfo(String cmdInfo) {
+		this.cmdInfo = cmdInfo;
+	}
+	
+	@Override
+    public int getStatus() {
+		return status;
+	}
+
+	public void setStatus(int status) {
+		this.status = status;
+	}
+	
+	@Override
+    public int getProcessStatus() {
+		return processStatus;
+	}
+	
+	public void setProcessStatus(int status) {
+		processStatus = status;
+	}
+	
+	@Override
+    public int getResultCode() {
+		return resultCode;
+	}
+	
+	public void setResultCode(int resultCode) {
+		this.resultCode = resultCode;
+	}
+
+	@Override
+    public String getResult() {
+		return result;
+	}
+
+	public void setResult(String result) {
+		this.result = result;
+	}
+
+	@Override
+    public Long getInitMsid() {
+		return initMsid;
+	}
+
+	@Override
+	public void setInitMsid(Long initMsid) {
+		this.initMsid = initMsid;
+	}
+	
+	@Override
+	public Long getExecutingMsid() {
+		return executingMsid;
+	}
+	
+	public void setExecutingMsid(Long executingMsid) {
+		this.executingMsid = executingMsid;
+	}
+
+	@Override
+    public Long getCompleteMsid() {
+		return completeMsid;
+	}
+
+	@Override
+	public void setCompleteMsid(Long completeMsid) {
+		this.completeMsid = completeMsid;
+	}
+
+	@Override
+    public Date getCreated() {
+		return created;
+	}
+
+	public void setCreated(Date created) {
+		this.created = created;
+	}
+
+	@Override
+    public Date getLastUpdated() {
+		return lastUpdated;
+	}
+
+	public void setLastUpdated(Date lastUpdated) {
+		this.lastUpdated = lastUpdated;
+	}
+
+	@Override
+    public Date getLastPolled() {
+		return lastPolled;
+	}
+
+	public void setLastPolled(Date lastPolled) {
+		this.lastPolled = lastPolled;
+	}
+
+	@Override
+    public Date getRemoved() {
+		return removed;
+	}
+
+	public void setRemoved(Date removed) {
+		this.removed = removed;
+	}
+	
+    @Override
+    public String getInstanceType() {
+		return instanceType;
+	}
+
+	public void setInstanceType(String instanceType) {
+		this.instanceType = instanceType;
+	}
+
+	@Override
+    public Long getInstanceId() {
+		return instanceId;
+	}
+
+	public void setInstanceId(Long instanceId) {
+		this.instanceId = instanceId;
+	}
+	
+	@Override
+    public SyncQueueItem getSyncSource() {
+        return syncSource;
+    }
+    
+	@Override
+    public void setSyncSource(SyncQueueItem syncSource) {
+        this.syncSource = syncSource;
+    }
+    
+    @Override
+    public String getUuid() {
+    	return uuid;
+    }
+    
+    public void setUuid(String uuid) {
+    	this.uuid = uuid;
+    }
+    
+	@Override
+    public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("AsyncJobVO {id:").append(getId());
+		sb.append(", userId: ").append(getUserId());
+		sb.append(", accountId: ").append(getAccountId());
+		sb.append(", instanceType: ").append(getInstanceType());
+		sb.append(", instanceId: ").append(getInstanceId());
+		sb.append(", cmd: ").append(getCmd());
+		sb.append(", cmdInfo: ").append(getCmdInfo());
+		sb.append(", cmdVersion: ").append(getCmdVersion());
+		sb.append(", status: ").append(getStatus());
+		sb.append(", processStatus: ").append(getProcessStatus());
+		sb.append(", resultCode: ").append(getResultCode());
+		sb.append(", result: ").append(getResult());
+		sb.append(", initMsid: ").append(getInitMsid());
+		sb.append(", completeMsid: ").append(getCompleteMsid());
+		sb.append(", lastUpdated: ").append(getLastUpdated());
+		sb.append(", lastPolled: ").append(getLastPolled());
+		sb.append(", created: ").append(getCreated());
+		sb.append("}");
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
new file mode 100644
index 0000000..17dd1cc
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+
+/**
+ * Note: toPairList and appendPairList only support simple POJO objects currently
+ */
+public class JobSerializerHelper {
+    private static final Logger s_logger = Logger.getLogger(JobSerializerHelper.class);
+    public static String token = "/";
+
+    private static Gson s_gson;
+    static {
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        gsonBuilder.setVersion(1.5);
+        s_logger.debug("Job GSON Builder initialized.");
+        s_gson = gsonBuilder.create();
+    }
+
+    public static String toSerializedString(Object result) {
+        if(result != null) {
+            Class<?> clz = result.getClass();
+            return clz.getName() + token + s_gson.toJson(result);
+        }
+        return null;
+    }
+
+    public static Object fromSerializedString(String result) {
+        try {
+            if(result != null && !result.isEmpty()) {
+
+                String[] serializedParts = result.split(token);
+
+                if (serializedParts.length < 2) {
+                    return null;
+                }
+                String clzName = serializedParts[0];
+                String nameField = null;
+                String content = null;
+                if (serializedParts.length == 2) {
+                    content = serializedParts[1];
+                } else {
+                    nameField = serializedParts[1];
+                    int index = result.indexOf(token + nameField + token);
+                    content = result.substring(index + nameField.length() + 2);
+                }
+
+                Class<?> clz;
+                try {
+                    clz = Class.forName(clzName);
+                } catch (ClassNotFoundException e) {
+                    return null;
+                }
+
+                Object obj = s_gson.fromJson(content, clz);
+                return obj;
+            }
+            return null;
+        } catch(RuntimeException e) {
+            throw new CloudRuntimeException("Unable to deserialize: " + result, e);
+        }
+    }
+    
+    public static String toObjectSerializedString(Serializable object) {
+    	assert(object != null);
+    	
+    	ByteArrayOutputStream bs = new ByteArrayOutputStream();
+    	try {
+    		ObjectOutputStream os = new ObjectOutputStream(bs);
+    		os.writeObject(object);
+    		os.close();
+    		bs.close();
+    		
+    		return Base64.encodeBase64URLSafeString(bs.toByteArray());
+    	} catch(IOException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + object, e);
+    	}
+    }
+    
+    public static Object fromObjectSerializedString(String base64EncodedString) {
+    	if(base64EncodedString == null)
+    		return null;
+    	
+    	byte[] content = Base64.decodeBase64(base64EncodedString);
+    	ByteArrayInputStream bs = new ByteArrayInputStream(content);
+    	try {
+    		ObjectInputStream is = new ObjectInputStream(bs);
+    		Object obj = is.readObject();
+    		is.close();
+    		bs.close();
+    		return obj;
+    	} catch(IOException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+        } catch (ClassNotFoundException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+    	}
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
new file mode 100644
index 0000000..04519e7
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+public interface SyncQueueItem {
+    public final String AsyncJobContentType = "AsyncJob";
+
+    /**
+     * @return queue item id
+     */
+    long getId();
+
+    /**
+     * @return queue id
+     */
+    Long getQueueId();
+
+    /**
+     * @return subject object type pointed by the queue item
+     */
+    String getContentType();
+    
+    /**
+     * @return subject object id pointed by the queue item
+     */
+    Long getContentId();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
new file mode 100644
index 0000000..f8bba02
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue_item")
+public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+    
+    @Column(name="queue_id")
+    private Long queueId;
+    
+    @Column(name="content_type")
+    private String contentType;
+    
+    @Column(name="content_id")
+    private Long contentId;
+    
+    @Column(name="queue_proc_msid")
+    private Long lastProcessMsid;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+    
+    @Column(name="queue_proc_time")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastProcessTime;
+    
+    @Column(name="created")
+    private Date created;
+    
+    public long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    @Override
+    public Long getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(Long queueId) {
+        this.queueId = queueId;
+    }
+
+    @Override
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+    
+    @Override
+    public Long getContentId() {
+        return contentId;
+    }
+
+    public void setContentId(Long contentId) {
+        this.contentId = contentId;
+    }
+
+    public Long getLastProcessMsid() {
+        return lastProcessMsid;
+    }
+
+    public void setLastProcessMsid(Long lastProcessMsid) {
+        this.lastProcessMsid = lastProcessMsid;
+    }
+
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+
+    public void setLastProcessNumber(Long lastProcessNumber) {
+        this.lastProcessNumber = lastProcessNumber;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
+        sb.append(", contentType: ").append(getContentType());
+        sb.append(", contentId: ").append(getContentId());
+        sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
+        sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastProcessTime: ").append(getLastProcessTime());
+        sb.append(", created: ").append(getCreated());
+        sb.append("}");
+        return sb.toString();
+    }
+
+       public Date getLastProcessTime() {
+            return lastProcessTime;
+        }
+
+        public void setLastProcessTime(Date lastProcessTime) {
+            this.lastProcessTime = lastProcessTime;
+        }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
new file mode 100644
index 0000000..202a704
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.List;
+
+import com.cloud.utils.component.Manager;
+
+public interface SyncQueueManager extends Manager {
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
+    public void purgeItem(long queueItemId);
+    public void returnItem(long queueItemId);
+
+	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+    void purgeAsyncJobQueueItemId(long asyncJobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
new file mode 100644
index 0000000..b9b5d6b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
+    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
+
+    @Inject private SyncQueueDao _syncQueueDao;
+    @Inject private SyncQueueItemDao _syncQueueItemDao;
+
+    @Override
+    @DB
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
+        Transaction txn = Transaction.currentTxn();
+        try {
+            txn.start();
+
+            _syncQueueDao.ensureQueue(syncObjType, syncObjId);
+            SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
+            if(queueVO == null)
+                throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+
+            queueVO.setQueueSizeLimit(queueSizeLimit);
+            _syncQueueDao.update(queueVO.getId(), queueVO);
+
+            Date dt = DateUtil.currentGMTTime();
+            SyncQueueItemVO item = new SyncQueueItemVO();
+            item.setQueueId(queueVO.getId());
+            item.setContentType(itemType);
+            item.setContentId(itemId);
+            item.setCreated(dt);
+
+            _syncQueueItemDao.persist(item);
+            txn.commit();
+
+            return queueVO;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txn.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
+            if(queueVO == null) {
+                s_logger.error("Sync queue(id: " + queueId + ") does not exist");
+                txt.commit();
+                return null;
+            }
+
+            if(queueReadyToProcess(queueVO)) {
+                SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
+                if(itemVO != null) {
+                    Long processNumber = queueVO.getLastProcessNumber();
+                    if(processNumber == null)
+                        processNumber = new Long(1);
+                    else
+                        processNumber = processNumber + 1;
+                    Date dt = DateUtil.currentGMTTime();
+                    queueVO.setLastProcessNumber(processNumber);
+                    queueVO.setLastUpdated(dt);
+                    queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                    itemVO.setLastProcessMsid(msid);
+                    itemVO.setLastProcessNumber(processNumber);
+                    itemVO.setLastProcessTime(dt);
+                    _syncQueueItemDao.update(itemVO.getId(), itemVO);
+
+                    txt.commit();
+                    return itemVO;
+                } else {
+                    if(s_logger.isDebugEnabled())
+                        s_logger.debug("Sync queue (" + queueId + ") is currently empty");
+                }
+            } else {
+                if(s_logger.isDebugEnabled())
+                    s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+
+        return null;
+    }
+
+    @Override
+    @DB
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
+
+        List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
+            if(l != null && l.size() > 0) {
+                for(SyncQueueItemVO item : l) {
+                    SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
+                    SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
+                    if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
+                        Long processNumber = queueVO.getLastProcessNumber();
+                        if(processNumber == null)
+                            processNumber = new Long(1);
+                        else
+                            processNumber = processNumber + 1;
+
+                        Date dt = DateUtil.currentGMTTime();
+                        queueVO.setLastProcessNumber(processNumber);
+                        queueVO.setLastUpdated(dt);
+                        queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                        _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                        itemVO.setLastProcessMsid(msid);
+                        itemVO.setLastProcessNumber(processNumber);
+                        itemVO.setLastProcessTime(dt);
+                        _syncQueueItemDao.update(item.getId(), itemVO);
+
+                        resultList.add(item);
+                    }
+                }
+            }
+            txt.commit();
+            return resultList;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public void purgeItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                _syncQueueItemDao.expunge(itemVO.getId());
+
+                // if item is active, reset queue information
+                if (itemVO.getLastProcessMsid() != null) {
+                    queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                    // decrement the count
+                    assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+                    queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+                }
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    @DB
+    public void returnItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                itemVO.setLastProcessMsid(null);
+                itemVO.setLastProcessNumber(null);
+                itemVO.setLastProcessTime(null);
+                _syncQueueItemDao.update(queueItemId, itemVO);
+
+                queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                _syncQueueDao.update(queueVO.getId(), queueVO);
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+        return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
+        return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
+    }
+
+    private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+    	return true;
+    	
+    	//
+    	// TODO
+    	//
+    	// Need to disable concurrency disable at queue level due to the need to support
+    	// job wake-up dispatching task
+    	//
+    	// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
+    	//
+    	
+        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+    }
+
+    @Override
+    public void purgeAsyncJobQueueItemId(long asyncJobId) {
+        Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+        if (itemId != null) {
+            purgeItem(itemId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
new file mode 100644
index 0000000..4fd4740
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+import java.util.Date;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue")
+public class SyncQueueVO implements InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id;
+
+    @Column(name="sync_objtype")
+
+    private String syncObjType;
+
+    @Column(name="sync_objid")
+    private Long syncObjId;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+
+    @Column(name="created")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date created;
+
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+
+    @Column(name="queue_size")
+    private long queueSize = 0;
+
+    @Column(name="queue_size_limit")
+    private long queueSizeLimit = 0;
+
+    public long getId() {
+        return id;
+    }
+
+    public String getSyncObjType() {
+        return syncObjType;
+    }
+
+    public void setSyncObjType(String syncObjType) {
+        this.syncObjType = syncObjType;
+    }
+
+    public Long getSyncObjId() {
+        return syncObjId;
+    }
+
+    public void setSyncObjId(Long syncObjId) {
+        this.syncObjId = syncObjId;
+    }
+    
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+    
+    public void setLastProcessNumber(Long number) {
+        lastProcessNumber = number;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
+
+    public void setLastUpdated(Date lastUpdated) {
+        this.lastUpdated = lastUpdated;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueVO {id:").append(getId());
+        sb.append(", syncObjType: ").append(getSyncObjType());
+        sb.append(", syncObjId: ").append(getSyncObjId());
+        sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastUpdated: ").append(getLastUpdated());
+        sb.append(", created: ").append(getCreated());
+        sb.append(", count: ").append(getQueueSize());
+        sb.append("}");
+        return sb.toString();
+    }
+
+    public long getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(long queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    public long getQueueSizeLimit() {
+        return queueSizeLimit;
+    }
+
+    public void setQueueSizeLimit(long queueSizeLimit) {
+        this.queueSizeLimit = queueSizeLimit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/api/ApiGsonHelper.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiGsonHelper.java b/server/src/com/cloud/api/ApiGsonHelper.java
index 6163860..c24808b 100644
--- a/server/src/com/cloud/api/ApiGsonHelper.java
+++ b/server/src/com/cloud/api/ApiGsonHelper.java
@@ -16,12 +16,27 @@
 // under the License.
 package com.cloud.api;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
 import com.google.gson.GsonBuilder;
+
 import org.apache.cloudstack.api.ResponseObject;
 
-import java.util.Map;
+import com.cloud.serializer.Param;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Pair;
 
 public class ApiGsonHelper {
+    private static final Logger s_logger = Logger.getLogger(ApiGsonHelper.class);
     private static final GsonBuilder s_gBuilder;
     static {
         s_gBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
@@ -33,4 +48,107 @@ public class ApiGsonHelper {
     public static GsonBuilder getBuilder() {
         return s_gBuilder;
     }
+
+    public static List<Pair<String, Object>> toPairList(Object o, String name) {
+        List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
+        return appendPairList(l, o, name);
+    }
+
+    public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
+        if (o != null) {
+            Class<?> clz = o.getClass();
+
+            if (clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
+                l.add(new Pair<String, Object>(name, o.toString()));
+                return l;
+            }
+
+            for (Field f : clz.getDeclaredFields()) {
+                if ((f.getModifiers() & Modifier.STATIC) != 0) {
+                    continue;
+                }
+
+                Param param = f.getAnnotation(Param.class);
+                if (param == null) {
+                    continue;
+                }
+
+                String propName = f.getName();
+                if (!param.propName().isEmpty()) {
+                    propName = param.propName();
+                }
+
+                String paramName = param.name();
+                if (paramName.isEmpty()) {
+                    paramName = propName;
+                }
+
+                Method method = getGetMethod(o, propName);
+                if (method != null) {
+                    try {
+                        Object fieldValue = method.invoke(o);
+                        if (fieldValue != null) {
+                            if (f.getType() == Date.class) {
+                                l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
+                            } else {
+                                l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
+                            }
+                        }
+                        //else
+                        //  l.add(new Pair<String, Object>(paramName, ""));
+                    } catch (IllegalArgumentException e) {
+                        s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+
+                    } catch (IllegalAccessException e) {
+                        s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+                    } catch (InvocationTargetException e) {
+                        s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
+                    }
+                }
+            }
+        }
+        return l;
+    }
+
+    private static Method getGetMethod(Object o, String propName) {
+        Method method = null;
+        String methodName = getGetMethodName("get", propName);
+        try {
+            method = o.getClass().getMethod(methodName);
+        } catch (SecurityException e1) {
+            s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
+        } catch (NoSuchMethodException e1) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName
+                        + ", will check is-prefixed method to see if it is boolean property");
+            }
+        }
+
+        if (method != null) {
+            return method;
+        }
+
+        methodName = getGetMethodName("is", propName);
+        try {
+            method = o.getClass().getMethod(methodName);
+        } catch (SecurityException e1) {
+            s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
+        } catch (NoSuchMethodException e1) {
+            s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
+        }
+        return method;
+    }
+
+    private static String getGetMethodName(String prefix, String fieldName) {
+        StringBuffer sb = new StringBuffer(prefix);
+
+        if (fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
+            return fieldName;
+        } else {
+            sb.append(fieldName.substring(0, 1).toUpperCase());
+            sb.append(fieldName.substring(1));
+        }
+
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index 175c8b8..4db9bb8 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -116,7 +116,7 @@ import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.region.RegionManager;
 
 import com.cloud.api.response.ApiResponseSerializer;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/async/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java
index e6efd03..9ca38a9 100644
--- a/server/src/com/cloud/async/AsyncJobExecutionContext.java
+++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java
@@ -20,15 +20,15 @@ import javax.inject.Inject;
 
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueItem;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
 
 import com.cloud.exception.ConcurrentOperationException;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.serializer.SerializerHelper;
 import com.cloud.utils.component.ComponentContext;
 
 public class AsyncJobExecutionContext  {
@@ -109,7 +109,7 @@ public class AsyncJobExecutionContext  {
     	
     	AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
     	if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) {
-    		Object exception = SerializerHelper.fromObjectSerializedString(record.getJoinResult());
+    		Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
     		if(exception != null && exception instanceof Exception) {
     			if(exception instanceof InsufficientCapacityException)
     				throw (InsufficientCapacityException)exception;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 42cbae8..f147bb0 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -43,19 +43,19 @@ import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
-import org.apache.cloudstack.framework.jobs.AsyncJobJournalVO;
-import org.apache.cloudstack.framework.jobs.AsyncJobMBeanImpl;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
-import org.apache.cloudstack.framework.jobs.SyncQueueItem;
-import org.apache.cloudstack.framework.jobs.SyncQueueItemVO;
-import org.apache.cloudstack.framework.jobs.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueVO;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMBeanImpl;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
 import org.apache.cloudstack.framework.messagebus.MessageDetector;
 import org.apache.cloudstack.framework.messagebus.PublishScope;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
index edc4216..32d1c19 100644
--- a/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java
@@ -35,8 +35,8 @@ import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 
 import com.cloud.api.ApiDispatcher;
 import com.cloud.api.ApiGsonHelper;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/template/TemplateManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/template/TemplateManagerImpl.java b/server/src/com/cloud/template/TemplateManagerImpl.java
index c811b5e..3317fa1 100755
--- a/server/src/com/cloud/template/TemplateManagerImpl.java
+++ b/server/src/com/cloud/template/TemplateManagerImpl.java
@@ -68,7 +68,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
 import org.apache.cloudstack.framework.async.AsyncCallFuture;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
index a5eceec..8109128 100644
--- a/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
+++ b/server/src/com/cloud/vm/VmWorkJobWakeupDispatcher.java
@@ -29,8 +29,8 @@ import org.apache.log4j.Logger;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobJoinMapVO;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
 import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/test/com/cloud/vm/VmWorkTest.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTest.java b/server/test/com/cloud/vm/VmWorkTest.java
index 3cb78da..d236cc3 100644
--- a/server/test/com/cloud/vm/VmWorkTest.java
+++ b/server/test/com/cloud/vm/VmWorkTest.java
@@ -41,7 +41,6 @@ import com.cloud.deploy.DeploymentPlan;
 import com.cloud.deploy.DeploymentPlanner.ExcludeList;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.InsufficientStorageCapacityException;
-import com.cloud.serializer.SerializerHelper;
 import com.cloud.utils.LogUtils;
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.ComponentContext;
@@ -49,7 +48,8 @@ import com.cloud.utils.db.Transaction;
 import com.google.gson.Gson;
 
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
 import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
 import org.apache.cloudstack.vm.jobs.VmWorkJobVO.Step;
@@ -167,10 +167,10 @@ public class VmWorkTest extends TestCase {
 	public void testExceptionSerialization() {
 		InsufficientCapacityException exception = new InsufficientStorageCapacityException("foo", VmWorkJobVO.class, 1L);
 		
-		String encodedString = SerializerHelper.toObjectSerializedString(exception);
+		String encodedString = JobSerializerHelper.toObjectSerializedString(exception);
 		System.out.println(encodedString);
 
-		exception = (InsufficientCapacityException)SerializerHelper.fromObjectSerializedString(encodedString);
+		exception = (InsufficientCapacityException)JobSerializerHelper.fromObjectSerializedString(encodedString);
 		Assert.assertTrue(exception.getScope() == VmWorkJobVO.class);
 		Assert.assertTrue(exception.getMessage().equals("foo"));
 	}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/server/test/com/cloud/vm/VmWorkTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/VmWorkTestConfiguration.java b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
index b436b16..cd0dc2c 100644
--- a/server/test/com/cloud/vm/VmWorkTestConfiguration.java
+++ b/server/test/com/cloud/vm/VmWorkTestConfiguration.java
@@ -20,9 +20,6 @@ import org.mockito.Mockito;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import org.apache.cloudstack.framework.jobs.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.SyncQueueManagerImpl;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobDaoImpl;
 import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
@@ -33,6 +30,9 @@ import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueManagerImpl;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
 import org.apache.cloudstack.vm.jobs.VmWorkJobDaoImpl;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/dd112540/utils/pom.xml
----------------------------------------------------------------------
diff --git a/utils/pom.xml b/utils/pom.xml
index 0690c35..878e91d 100644
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -64,6 +64,11 @@
       <version>${cs.codec.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>${cs.gson.version}</version>
+    </dependency>
+    <dependency>
       <groupId>commons-collections</groupId>
       <artifactId>commons-collections</artifactId>
       <version>${cs.collections.version}</version>


Mime
View raw message