cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [1/3] Moved over the new jobs framework from vmsync. This has not been integrated into the server package yet. Will do that next
Date Tue, 30 Jul 2013 22:00:49 GMT
Updated Branches:
  refs/heads/master 730d04508 -> 1e1ee902a


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
new file mode 100644
index 0000000..89bbd86
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.Date;
+import java.util.UUID;
+
+import javax.persistence.Column;
+import javax.persistence.DiscriminatorColumn;
+import javax.persistence.DiscriminatorType;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+import javax.persistence.Transient;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.utils.UuidUtils;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job")
+@Inheritance(strategy=InheritanceType.JOINED)
+@DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32)
+public class AsyncJobVO implements AsyncJob, JobInfo {
+
+    public static final String JOB_DISPATCHER_PSEUDO = "pseudoJobDispatcher";
+    public static final String PSEUDO_JOB_INSTANCE_TYPE = "Thread";
+	
+	@Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private long id;
+	
+    @Column(name="job_type", length=32)
+    protected String type;
+    
+    @Column(name="job_dispatcher", length=64)
+    protected String dispatcher;
+    
+    @Column(name="job_pending_signals")
+    protected int pendingSignals;
+    
+    @Column(name="user_id")
+    private long userId;
+    
+    @Column(name="account_id")
+    private long accountId;
+    
+	@Column(name="job_cmd")
+    private String cmd;
+
+	@Column(name="job_cmd_ver")
+    private int cmdVersion;
+	
+    @Column(name = "related")
+    private String related;
+
+    @Column(name="job_cmd_info", length=65535)
+    private String cmdInfo;
+  
+    @Column(name="job_status")
+    @Enumerated(value = EnumType.ORDINAL)
+    private Status status;
+    
+    @Column(name="job_process_status")
+    private int processStatus;
+    
+    @Column(name="job_result_code")
+    private int resultCode;
+    
+    @Column(name="job_result", length=65535)
+    private String result;
+    
+    @Column(name="instance_type", length=64)
+    private String instanceType;
+    
+	@Column(name="instance_id", length=64)
+    private Long instanceId;
+    
+    @Column(name="job_init_msid")
+    private Long initMsid;
+
+    @Column(name="job_complete_msid")
+    private Long completeMsid;
+    
+    @Column(name="job_executing_msid")
+    private Long executingMsid;
+
+    @Column(name=GenericDao.CREATED_COLUMN)
+    private Date created;
+    
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+    
+    @Column(name="last_polled")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastPolled;
+    
+    @Column(name=GenericDao.REMOVED_COLUMN)
+    private Date removed;
+    
+    @Column(name="uuid")
+    private String uuid;
+
+    @Transient
+    private SyncQueueItem syncSource = null;
+
+    public AsyncJobVO() {
+        uuid = UUID.randomUUID().toString();
+        related = UUID.randomUUID().toString();
+        status = Status.IN_PROGRESS;
+    }
+
+    public AsyncJobVO(String related, long userId, long accountId, String cmd, String cmdInfo, Long instanceId, String instanceType) {
+		this.userId = userId;
+		this.accountId = accountId;
+		this.cmd = cmd;
+		this.cmdInfo = cmdInfo;
+        uuid = UUID.randomUUID().toString();
+        this.related = related;
+	    this.instanceId = instanceId;
+	    this.instanceType = instanceType;
+        status = Status.IN_PROGRESS;
+    }
+
+    @Override
+    public long getId() {
+		return id;
+	}
+
+    public void setId(long id) {
+		this.id = id;
+	}
+	
+    @Override
+    public String getShortUuid() {
+        return UuidUtils.first(uuid);
+    }
+
+    public void setRelated(String related) {
+        this.related = related;
+    }
+
+    @Override
+    public String getRelated() {
+        return related;
+    }
+
+	@Override
+	public String getType() {
+		return type;
+	}
+	
+	public void setType(String type) {
+		this.type = type;
+	}
+	
+	@Override
+	public String getDispatcher() {
+		return dispatcher;
+	}
+	
+	public void setDispatcher(String dispatcher) {
+		this.dispatcher = dispatcher;
+	}
+	
+	@Override
+	public int getPendingSignals() {
+		return pendingSignals;
+	}
+	
+	public void setPendingSignals(int signals) {
+		pendingSignals = signals;
+	}
+
+	@Override
+    public long getUserId() {
+		return userId;
+	}
+
+	public void setUserId(long userId) {
+		this.userId = userId;
+	}
+
+	@Override
+    public long getAccountId() {
+		return accountId;
+	}
+
+	public void setAccountId(long accountId) {
+		this.accountId = accountId;
+	}
+
+	@Override
+    public String getCmd() {
+		return cmd;
+	}
+
+	public void setCmd(String cmd) {
+		this.cmd = cmd;
+	}
+	
+	@Override
+    public int getCmdVersion() {
+		return cmdVersion;
+	}
+	
+	public void setCmdVersion(int version) {
+		cmdVersion = version;
+	}
+
+	@Override
+    public String getCmdInfo() {
+		return cmdInfo;
+	}
+
+	public void setCmdInfo(String cmdInfo) {
+		this.cmdInfo = cmdInfo;
+	}
+	
+	@Override
+    public Status getStatus() {
+		return status;
+	}
+
+    public void setStatus(Status status) {
+		this.status = status;
+	}
+	
+	@Override
+    public int getProcessStatus() {
+		return processStatus;
+	}
+	
+	public void setProcessStatus(int status) {
+		processStatus = status;
+	}
+	
+	@Override
+    public int getResultCode() {
+		return resultCode;
+	}
+	
+	public void setResultCode(int resultCode) {
+		this.resultCode = resultCode;
+	}
+
+	@Override
+    public String getResult() {
+		return result;
+	}
+
+	public void setResult(String result) {
+		this.result = result;
+	}
+
+	@Override
+    public Long getInitMsid() {
+		return initMsid;
+	}
+
+	@Override
+	public void setInitMsid(Long initMsid) {
+		this.initMsid = initMsid;
+	}
+	
+	@Override
+	public Long getExecutingMsid() {
+		return executingMsid;
+	}
+	
+	public void setExecutingMsid(Long executingMsid) {
+		this.executingMsid = executingMsid;
+	}
+
+	@Override
+    public Long getCompleteMsid() {
+		return completeMsid;
+	}
+
+	@Override
+	public void setCompleteMsid(Long completeMsid) {
+		this.completeMsid = completeMsid;
+	}
+
+	@Override
+    public Date getCreated() {
+		return created;
+	}
+
+	public void setCreated(Date created) {
+		this.created = created;
+	}
+
+	@Override
+    public Date getLastUpdated() {
+		return lastUpdated;
+	}
+
+	public void setLastUpdated(Date lastUpdated) {
+		this.lastUpdated = lastUpdated;
+	}
+
+	@Override
+    public Date getLastPolled() {
+		return lastPolled;
+	}
+
+	public void setLastPolled(Date lastPolled) {
+		this.lastPolled = lastPolled;
+	}
+
+    @Override
+    public String getInstanceType() {
+		return instanceType;
+	}
+
+	public void setInstanceType(String instanceType) {
+		this.instanceType = instanceType;
+	}
+
+	@Override
+    public Long getInstanceId() {
+		return instanceId;
+	}
+
+	public void setInstanceId(Long instanceId) {
+		this.instanceId = instanceId;
+	}
+	
+	@Override
+    public SyncQueueItem getSyncSource() {
+        return syncSource;
+    }
+    
+	@Override
+    public void setSyncSource(SyncQueueItem syncSource) {
+        this.syncSource = syncSource;
+    }
+    
+    @Override
+    public String getUuid() {
+    	return uuid;
+    }
+    
+    public void setUuid(String uuid) {
+    	this.uuid = uuid;
+    }
+    
+	@Override
+    public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("AsyncJobVO {id:").append(getId());
+		sb.append(", userId: ").append(getUserId());
+		sb.append(", accountId: ").append(getAccountId());
+		sb.append(", instanceType: ").append(getInstanceType());
+		sb.append(", instanceId: ").append(getInstanceId());
+		sb.append(", cmd: ").append(getCmd());
+		sb.append(", cmdInfo: ").append(getCmdInfo());
+		sb.append(", cmdVersion: ").append(getCmdVersion());
+		sb.append(", status: ").append(getStatus());
+		sb.append(", processStatus: ").append(getProcessStatus());
+		sb.append(", resultCode: ").append(getResultCode());
+		sb.append(", result: ").append(getResult());
+		sb.append(", initMsid: ").append(getInitMsid());
+		sb.append(", completeMsid: ").append(getCompleteMsid());
+		sb.append(", lastUpdated: ").append(getLastUpdated());
+		sb.append(", lastPolled: ").append(getLastPolled());
+		sb.append(", created: ").append(getCreated());
+		sb.append("}");
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
new file mode 100644
index 0000000..6acc933
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/JobSerializerHelper.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+
+/**
+ * Note: toPairList and appendPairList only support simple POJO objects currently
+ */
+public class JobSerializerHelper {
+    private static final Logger s_logger = Logger.getLogger(JobSerializerHelper.class);
+    public static String token = "/";
+
+    private static Gson s_gson;
+    static {
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        gsonBuilder.setVersion(1.5);
+        s_logger.debug("Job GSON Builder initialized.");
+        gsonBuilder.registerTypeAdapter(Class.class, new ClassTypeAdapter());
+        gsonBuilder.registerTypeAdapter(Throwable.class, new ThrowableTypeAdapter());
+        s_gson = gsonBuilder.create();
+    }
+
+    public static String toSerializedString(Object result) {
+        if(result != null) {
+            Class<?> clz = result.getClass();
+            return clz.getName() + token + s_gson.toJson(result);
+        }
+        return null;
+    }
+
+    public static Object fromSerializedString(String result) {
+        try {
+            if(result != null && !result.isEmpty()) {
+
+                String[] serializedParts = result.split(token);
+
+                if (serializedParts.length < 2) {
+                    return null;
+                }
+                String clzName = serializedParts[0];
+                String nameField = null;
+                String content = null;
+                if (serializedParts.length == 2) {
+                    content = serializedParts[1];
+                } else {
+                    nameField = serializedParts[1];
+                    int index = result.indexOf(token + nameField + token);
+                    content = result.substring(index + nameField.length() + 2);
+                }
+
+                Class<?> clz;
+                try {
+                    clz = Class.forName(clzName);
+                } catch (ClassNotFoundException e) {
+                    return null;
+                }
+
+                Object obj = s_gson.fromJson(content, clz);
+                return obj;
+            }
+            return null;
+        } catch(RuntimeException e) {
+            throw new CloudRuntimeException("Unable to deserialize: " + result, e);
+        }
+    }
+    
+    public static String toObjectSerializedString(Serializable object) {
+    	assert(object != null);
+    	
+    	ByteArrayOutputStream bs = new ByteArrayOutputStream();
+    	try {
+    		ObjectOutputStream os = new ObjectOutputStream(bs);
+    		os.writeObject(object);
+    		os.close();
+    		bs.close();
+    		
+    		return Base64.encodeBase64URLSafeString(bs.toByteArray());
+    	} catch(IOException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + object, e);
+    	}
+    }
+    
+    public static Object fromObjectSerializedString(String base64EncodedString) {
+    	if(base64EncodedString == null)
+    		return null;
+    	
+    	byte[] content = Base64.decodeBase64(base64EncodedString);
+    	ByteArrayInputStream bs = new ByteArrayInputStream(content);
+    	try {
+    		ObjectInputStream is = new ObjectInputStream(bs);
+    		Object obj = is.readObject();
+    		is.close();
+    		bs.close();
+    		return obj;
+    	} catch(IOException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+        } catch (ClassNotFoundException e) {
+            throw new CloudRuntimeException("Unable to serialize: " + base64EncodedString, e);
+    	}
+    }
+
+    public static class ClassTypeAdapter implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
+        @Override
+        public JsonElement serialize(Class<?> clazz, Type typeOfResponseObj, JsonSerializationContext ctx) {
+            return new JsonPrimitive(clazz.getName());
+        }
+
+        @Override
+        public Class<?> deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException {
+            String str = arg0.getAsString();
+            try {
+                return Class.forName(str);
+            } catch (ClassNotFoundException e) {
+                throw new CloudRuntimeException("Unable to find class " + str);
+            }
+        }
+    }
+
+    public static class ThrowableTypeAdapter implements JsonSerializer<Throwable>, JsonDeserializer<Throwable> {
+
+        @Override
+        public Throwable deserialize(JsonElement json, Type type, JsonDeserializationContext ctx) throws JsonParseException {
+            JsonObject obj = (JsonObject)json;
+
+            String className = obj.get("class").getAsString();
+            try {
+                Class<Throwable> clazz  = (Class<Throwable>)Class.forName(className);
+                Throwable cause = s_gson.fromJson(obj.get("cause"), Throwable.class);
+                String msg = obj.get("msg").getAsString();
+                Constructor<Throwable> constructor = clazz.getConstructor(String.class, Throwable.class);
+                Throwable th = constructor.newInstance(msg, cause);
+                return th;
+            } catch (ClassNotFoundException e) {
+                throw new JsonParseException("Unable to find " + className);
+            } catch (NoSuchMethodException e) {
+                throw new JsonParseException("Unable to find constructor for " + className);
+            } catch (SecurityException e) {
+                throw new JsonParseException("Unable to get over security " + className);
+            } catch (InstantiationException e) {
+                throw new JsonParseException("Unable to instantiate " + className);
+            } catch (IllegalAccessException e) {
+                throw new JsonParseException("Illegal access to " + className, e);
+            } catch (IllegalArgumentException e) {
+                throw new JsonParseException("Illegal argument to " + className, e);
+            } catch (InvocationTargetException e) {
+                throw new JsonParseException("Cannot invoke " + className, e);
+            }
+        }
+
+        @Override
+        public JsonElement serialize(Throwable th, Type type, JsonSerializationContext ctx) {
+            JsonObject json = new JsonObject();
+
+            json.add("class", new JsonPrimitive(th.getClass().getName()));
+            json.add("cause", s_gson.toJsonTree(th.getCause()));
+            json.add("msg", new JsonPrimitive(th.getMessage()));
+//            json.add("stack", s_gson.toJsonTree(th.getStackTrace()));
+
+            return json;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java
new file mode 100644
index 0000000..03c652c
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
+import org.apache.cloudstack.framework.jobs.Outcome;
+
+import com.cloud.utils.Predicate;
+public class OutcomeImpl<T> implements Outcome<T> {
+    protected AsyncJob _job;
+    protected Class<T> _clazz;
+    protected String[] _topics;
+    protected Predicate _predicate;
+    protected long _checkIntervalInMs;
+
+    protected T _result;
+
+    private static AsyncJobManagerImpl s_jobMgr;
+
+    public static void init(AsyncJobManagerImpl jobMgr) {
+        s_jobMgr = jobMgr;
+    }
+
+    public OutcomeImpl(Class<T> clazz, AsyncJob job, long checkIntervalInMs, Predicate predicate, String... topics) {
+        _clazz = clazz;
+        _job = job;
+        _topics = topics;
+        _predicate = predicate;
+        _checkIntervalInMs = checkIntervalInMs;
+    }
+
+    @Override
+    public AsyncJob getJob() {
+        return _job;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        s_jobMgr.waitAndCheck(getJob(), _topics, _checkIntervalInMs, -1, _predicate);
+        try {
+            AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(_job.getId());
+        } catch (Throwable e) {
+            throw new ExecutionException("Job task has trouble executing", e);
+        }
+
+        return retrieve();
+    }
+
+    @Override
+    public T get(long timeToWait, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        s_jobMgr.waitAndCheck(getJob(), _topics, _checkIntervalInMs, unit.toMillis(timeToWait), _predicate);
+        try {
+            AsyncJobExecutionContext.getCurrentExecutionContext().disjoinJob(_job.getId());
+        } catch (Throwable e) {
+            throw new ExecutionException("Job task has trouble executing", e);
+        }
+        return retrieve();
+    }
+
+    /**
+     * This method can be overridden by children classes to retrieve the
+     * actual object.
+     */
+    protected T retrieve() {
+        return _result;
+    }
+
+    protected Outcome<T> set(T result) {
+        _result = result;
+        return this;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void execute(Task<T> task) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void execute(Task<T> task, long wait, TimeUnit unit) {
+        // TODO Auto-generated method stub
+
+    }
+
+    public Predicate getPredicate() {
+        return _predicate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
new file mode 100644
index 0000000..04519e7
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItem.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+public interface SyncQueueItem {
+    public final String AsyncJobContentType = "AsyncJob";
+
+    /**
+     * @return queue item id
+     */
+    long getId();
+
+    /**
+     * @return queue id
+     */
+    Long getQueueId();
+
+    /**
+     * @return subject object type pointed by the queue item
+     */
+    String getContentType();
+    
+    /**
+     * @return subject object id pointed by the queue item
+     */
+    Long getContentId();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
new file mode 100644
index 0000000..f8bba02
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueItemVO.java
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue_item")
+public class SyncQueueItemVO implements SyncQueueItem, InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+    
+    @Column(name="queue_id")
+    private Long queueId;
+    
+    @Column(name="content_type")
+    private String contentType;
+    
+    @Column(name="content_id")
+    private Long contentId;
+    
+    @Column(name="queue_proc_msid")
+    private Long lastProcessMsid;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+    
+    @Column(name="queue_proc_time")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastProcessTime;
+    
+    @Column(name="created")
+    private Date created;
+    
+    public long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    @Override
+    public Long getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(Long queueId) {
+        this.queueId = queueId;
+    }
+
+    @Override
+    public String getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(String contentType) {
+        this.contentType = contentType;
+    }
+    
+    @Override
+    public Long getContentId() {
+        return contentId;
+    }
+
+    public void setContentId(Long contentId) {
+        this.contentId = contentId;
+    }
+
+    public Long getLastProcessMsid() {
+        return lastProcessMsid;
+    }
+
+    public void setLastProcessMsid(Long lastProcessMsid) {
+        this.lastProcessMsid = lastProcessMsid;
+    }
+
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+
+    public void setLastProcessNumber(Long lastProcessNumber) {
+        this.lastProcessNumber = lastProcessNumber;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
+        sb.append(", contentType: ").append(getContentType());
+        sb.append(", contentId: ").append(getContentId());
+        sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
+        sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastProcessTime: ").append(getLastProcessTime());
+        sb.append(", created: ").append(getCreated());
+        sb.append("}");
+        return sb.toString();
+    }
+
+       public Date getLastProcessTime() {
+            return lastProcessTime;
+        }
+
+        public void setLastProcessTime(Date lastProcessTime) {
+            this.lastProcessTime = lastProcessTime;
+        }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
new file mode 100644
index 0000000..202a704
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManager.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.List;
+
+import com.cloud.utils.component.Manager;
+
+public interface SyncQueueManager extends Manager {
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit);
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid);
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems);
+    public void purgeItem(long queueItemId);
+    public void returnItem(long queueItemId);
+
+	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+    void purgeAsyncJobQueueItemId(long asyncJobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
new file mode 100644
index 0000000..b9b5d6b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import javax.inject.Inject;
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
+import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManager {
+    public static final Logger s_logger = Logger.getLogger(SyncQueueManagerImpl.class.getName());
+
+    @Inject private SyncQueueDao _syncQueueDao;
+    @Inject private SyncQueueItemDao _syncQueueItemDao;
+
+    @Override
+    @DB
+    public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit) {
+        Transaction txn = Transaction.currentTxn();
+        try {
+            txn.start();
+
+            _syncQueueDao.ensureQueue(syncObjType, syncObjId);
+            SyncQueueVO queueVO = _syncQueueDao.find(syncObjType, syncObjId);
+            if(queueVO == null)
+                throw new CloudRuntimeException("Unable to queue item into DB, DB is full?");
+
+            queueVO.setQueueSizeLimit(queueSizeLimit);
+            _syncQueueDao.update(queueVO.getId(), queueVO);
+
+            Date dt = DateUtil.currentGMTTime();
+            SyncQueueItemVO item = new SyncQueueItemVO();
+            item.setQueueId(queueVO.getId());
+            item.setContentType(itemType);
+            item.setContentId(itemId);
+            item.setCreated(dt);
+
+            _syncQueueItemDao.persist(item);
+            txn.commit();
+
+            return queueVO;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txn.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public SyncQueueItemVO dequeueFromOne(long queueId, Long msid) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
+            if(queueVO == null) {
+                s_logger.error("Sync queue(id: " + queueId + ") does not exist");
+                txt.commit();
+                return null;
+            }
+
+            if(queueReadyToProcess(queueVO)) {
+                SyncQueueItemVO itemVO = _syncQueueItemDao.getNextQueueItem(queueVO.getId());
+                if(itemVO != null) {
+                    Long processNumber = queueVO.getLastProcessNumber();
+                    if(processNumber == null)
+                        processNumber = new Long(1);
+                    else
+                        processNumber = processNumber + 1;
+                    Date dt = DateUtil.currentGMTTime();
+                    queueVO.setLastProcessNumber(processNumber);
+                    queueVO.setLastUpdated(dt);
+                    queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                    itemVO.setLastProcessMsid(msid);
+                    itemVO.setLastProcessNumber(processNumber);
+                    itemVO.setLastProcessTime(dt);
+                    _syncQueueItemDao.update(itemVO.getId(), itemVO);
+
+                    txt.commit();
+                    return itemVO;
+                } else {
+                    if(s_logger.isDebugEnabled())
+                        s_logger.debug("Sync queue (" + queueId + ") is currently empty");
+                }
+            } else {
+                if(s_logger.isDebugEnabled())
+                    s_logger.debug("There is a pending process in sync queue(id: " + queueId + ")");
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+
+        return null;
+    }
+
+    @Override
+    @DB
+    public List<SyncQueueItemVO> dequeueFromAny(Long msid, int maxItems) {
+
+        List<SyncQueueItemVO> resultList = new ArrayList<SyncQueueItemVO>();
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
+            if(l != null && l.size() > 0) {
+                for(SyncQueueItemVO item : l) {
+                    SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
+                    SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
+                    if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
+                        Long processNumber = queueVO.getLastProcessNumber();
+                        if(processNumber == null)
+                            processNumber = new Long(1);
+                        else
+                            processNumber = processNumber + 1;
+
+                        Date dt = DateUtil.currentGMTTime();
+                        queueVO.setLastProcessNumber(processNumber);
+                        queueVO.setLastUpdated(dt);
+                        queueVO.setQueueSize(queueVO.getQueueSize() + 1);
+                        _syncQueueDao.update(queueVO.getId(), queueVO);
+
+                        itemVO.setLastProcessMsid(msid);
+                        itemVO.setLastProcessNumber(processNumber);
+                        itemVO.setLastProcessTime(dt);
+                        _syncQueueItemDao.update(item.getId(), itemVO);
+
+                        resultList.add(item);
+                    }
+                }
+            }
+            txt.commit();
+            return resultList;
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+        return null;
+    }
+
+    @Override
+    @DB
+    public void purgeItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                _syncQueueItemDao.expunge(itemVO.getId());
+
+                // if item is active, reset queue information
+                if (itemVO.getLastProcessMsid() != null) {
+                    queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                    // decrement the count
+                    assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+                    queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+                    _syncQueueDao.update(queueVO.getId(), queueVO);
+                }
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    @DB
+    public void returnItem(long queueItemId) {
+        Transaction txt = Transaction.currentTxn();
+        try {
+            txt.start();
+
+            SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
+            if(itemVO != null) {
+                SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
+
+                itemVO.setLastProcessMsid(null);
+                itemVO.setLastProcessNumber(null);
+                itemVO.setLastProcessTime(null);
+                _syncQueueItemDao.update(queueItemId, itemVO);
+
+                queueVO.setLastUpdated(DateUtil.currentGMTTime());
+                _syncQueueDao.update(queueVO.getId(), queueVO);
+            }
+            txt.commit();
+        } catch(Exception e) {
+            s_logger.error("Unexpected exception: ", e);
+            txt.rollback();
+        }
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+        return _syncQueueItemDao.getActiveQueueItems(msid, exclusive);
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
+        return _syncQueueItemDao.getBlockedQueueItems(thresholdMs, exclusive);
+    }
+
+    private boolean queueReadyToProcess(SyncQueueVO queueVO) {
+    	return true;
+    	
+    	//
+    	// TODO
+    	//
+    	// Need to disable concurrency disable at queue level due to the need to support
+    	// job wake-up dispatching task
+    	//
+    	// Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
+    	//
+    	
+        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+    }
+
+    @Override
+    public void purgeAsyncJobQueueItemId(long asyncJobId) {
+        Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+        if (itemId != null) {
+            purgeItem(itemId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
new file mode 100644
index 0000000..4fd4740
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueVO.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.impl;
+
+import org.apache.cloudstack.api.InternalIdentity;
+
+import java.util.Date;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+@Entity
+@Table(name="sync_queue")
+public class SyncQueueVO implements InternalIdentity {
+
+    @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id;
+
+    @Column(name="sync_objtype")
+
+    private String syncObjType;
+
+    @Column(name="sync_objid")
+    private Long syncObjId;
+
+    @Column(name="queue_proc_number")
+    private Long lastProcessNumber;
+
+    @Column(name="created")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date created;
+
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+
+    @Column(name="queue_size")
+    private long queueSize = 0;
+
+    @Column(name="queue_size_limit")
+    private long queueSizeLimit = 0;
+
+    public long getId() {
+        return id;
+    }
+
+    public String getSyncObjType() {
+        return syncObjType;
+    }
+
+    public void setSyncObjType(String syncObjType) {
+        this.syncObjType = syncObjType;
+    }
+
+    public Long getSyncObjId() {
+        return syncObjId;
+    }
+
+    public void setSyncObjId(Long syncObjId) {
+        this.syncObjId = syncObjId;
+    }
+    
+    public Long getLastProcessNumber() {
+        return lastProcessNumber;
+    }
+    
+    public void setLastProcessNumber(Long number) {
+        lastProcessNumber = number;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
+
+    public void setLastUpdated(Date lastUpdated) {
+        this.lastUpdated = lastUpdated;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SyncQueueVO {id:").append(getId());
+        sb.append(", syncObjType: ").append(getSyncObjType());
+        sb.append(", syncObjId: ").append(getSyncObjId());
+        sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
+        sb.append(", lastUpdated: ").append(getLastUpdated());
+        sb.append(", created: ").append(getCreated());
+        sb.append(", count: ").append(getQueueSize());
+        sb.append("}");
+        return sb.toString();
+    }
+
+    public long getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(long queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    public long getQueueSizeLimit() {
+        return queueSizeLimit;
+    }
+
+    public void setQueueSizeLimit(long queueSizeLimit) {
+        this.queueSizeLimit = queueSizeLimit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 91b0343..bc72aff 100755
--- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -713,12 +713,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO vo : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost vo : nodeList) {
             s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid());
             long lastPing = (System.currentTimeMillis() >> 10) - getTimeout();
             _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index faf3e71..3aceec4 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -61,7 +61,7 @@ import com.cloud.api.ApiGsonHelper;
 import com.cloud.api.ApiSerializerHelper;
 import com.cloud.async.dao.AsyncJobDao;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.domain.DomainVO;
@@ -823,12 +823,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for(ManagementServerHostVO msHost : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost msHost : nodeList) {
             Transaction txn = Transaction.open(Transaction.CLOUD_DB);
             try {
                 txn.start();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
index 71c1a4d..93de351 100755
--- a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
+++ b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
@@ -37,7 +37,7 @@ import org.apache.cloudstack.context.ServerContexts;
 import com.cloud.agent.AgentManager;
 import com.cloud.alert.AlertManager;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.dc.ClusterDetailsDao;
@@ -866,12 +866,12 @@ public class HighAvailabilityManagerImpl extends ManagerBase implements HighAvai
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO node : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost node : nodeList) {
             _haDao.releaseWorkItems(node.getMsid());
         }
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/server/src/com/cloud/server/LockMasterListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/server/LockMasterListener.java b/server/src/com/cloud/server/LockMasterListener.java
index ee9c9a9..8bd64bb 100644
--- a/server/src/com/cloud/server/LockMasterListener.java
+++ b/server/src/com/cloud/server/LockMasterListener.java
@@ -19,7 +19,7 @@ package com.cloud.server;
 import java.util.List;
 
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.utils.db.Merovingian2;
 
 /**
@@ -34,12 +34,12 @@ public class LockMasterListener implements ClusterManagerListener {
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO node : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost node : nodeList) {
             _lockMaster.cleanupForServer(node.getMsid());
         }
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/server/src/com/cloud/storage/StorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/StorageManagerImpl.java b/server/src/com/cloud/storage/StorageManagerImpl.java
index 20d5a62..a8cfcc0 100755
--- a/server/src/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/com/cloud/storage/StorageManagerImpl.java
@@ -48,9 +48,9 @@ import org.apache.cloudstack.api.command.admin.storage.AddImageStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaintenanceCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateSecondaryStagingStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
-import org.apache.cloudstack.api.command.admin.storage.DeleteSecondaryStagingStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeleteImageStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeletePoolCmd;
+import org.apache.cloudstack.api.command.admin.storage.DeleteSecondaryStagingStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
@@ -100,7 +100,7 @@ import com.cloud.capacity.CapacityState;
 import com.cloud.capacity.CapacityVO;
 import com.cloud.capacity.dao.CapacityDao;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.ConfigurationManager;
 import com.cloud.configuration.dao.ConfigurationDao;
@@ -1271,14 +1271,14 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
         // TODO Auto-generated method stub
 
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO vo : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost vo : nodeList) {
             if (vo.getMsid() == _serverId) {
                 s_logger.info("Cleaning up storage maintenance jobs associated with Management server: " + vo.getMsid());
                 List<Long> poolIds = _storagePoolWorkDao.searchForPoolIdsForPendingWorkJobs(vo.getMsid());
@@ -1886,7 +1886,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
             throw new InvalidParameterValueException("Cannot delete cache store with staging volumes currently in use!");
         }
 
-        List<TemplateDataStoreVO> templates = this._templateStoreDao.listActiveOnCache(storeId);
+        List<TemplateDataStoreVO> templates = _templateStoreDao.listActiveOnCache(storeId);
         if (templates != null && templates.size() > 0) {
             throw new InvalidParameterValueException("Cannot delete cache store with staging templates currently in use!");
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java b/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
index 2ee2d56..8f0e00e 100644
--- a/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
+++ b/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
@@ -25,7 +25,7 @@ import javax.naming.ConfigurationException;
 
 import com.cloud.cluster.ClusterManager;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 
 @Local(value=VirtualMachineManager.class)
 public class ClusteredVirtualMachineManagerImpl extends VirtualMachineManagerImpl implements ClusterManagerListener {
@@ -37,13 +37,13 @@ public class ClusteredVirtualMachineManagerImpl extends VirtualMachineManagerImp
     }
     
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
         
     }
     
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO node : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost node : nodeList) {
             cancelWorkItems(node.getMsid());
         }
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/utils/src/org/apache/cloudstack/config/Configurable.java
----------------------------------------------------------------------
diff --git a/utils/src/org/apache/cloudstack/config/Configurable.java b/utils/src/org/apache/cloudstack/config/Configurable.java
new file mode 100644
index 0000000..3c50eba
--- /dev/null
+++ b/utils/src/org/apache/cloudstack/config/Configurable.java
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.config;
+
+public interface Configurable {
+    ConfigKey<?>[] getConfigKeys();
+}


Mime
View raw message