cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [3/3] git commit: updated refs/heads/master to 1e1ee90
Date Tue, 30 Jul 2013 22:00:51 GMT
Moved over the new jobs framework from vmsync.  This has not been integrated into the server package yet.  Will do that next


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/1e1ee902
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/1e1ee902
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/1e1ee902

Branch: refs/heads/master
Commit: 1e1ee902a21061f7eae94ffabc7a4199934cca3c
Parents: 730d045
Author: Alex Huang <alex.huang@citrix.com>
Authored: Tue Jul 30 15:00:45 2013 -0700
Committer: Alex Huang <alex.huang@citrix.com>
Committed: Tue Jul 30 15:00:58 2013 -0700

----------------------------------------------------------------------
 api/src/org/apache/cloudstack/jobs/JobInfo.java |  81 ++
 .../cloud/cluster/ClusterFenceManagerImpl.java  |   4 +-
 .../cloud/cluster/ClusterManagerListener.java   |   5 +-
 .../async/AsyncCallbackDispatcher.java          |  10 +-
 .../framework/client/ClientEventBus.java        |  31 -
 .../framework/client/ClientMessageBus.java      |  31 +
 .../framework/messagebus/MessageBusBase.java    |  34 +-
 .../framework/messagebus/MessageDetector.java   |  75 ++
 .../framework/server/ServerEventBus.java        |  31 -
 .../framework/server/ServerMessageBus.java      |  31 +
 .../sampleserver/SampleManagementServer.java    |   3 -
 .../cloudstack/messagebus/TestMessageBus.java   |  39 +
 framework/jobs/pom.xml                          |  23 +-
 .../apache/cloudstack/framework/job/Job.java    |  32 -
 .../framework/job/JobInterceptor.java           |  23 -
 .../cloudstack/framework/jobs/AsyncJob.java     | 117 +++
 .../framework/jobs/AsyncJobDispatcher.java      |  28 +
 .../jobs/AsyncJobExecutionContext.java          | 167 ++++
 .../framework/jobs/AsyncJobMBean.java           |  37 +
 .../framework/jobs/AsyncJobManager.java         | 125 +++
 .../jobs/JobCancellationException.java          |  49 +
 .../cloudstack/framework/jobs/Outcome.java      |  62 ++
 .../framework/jobs/dao/AsyncJobDao.java         |  37 +
 .../framework/jobs/dao/AsyncJobDaoImpl.java     | 198 ++++
 .../framework/jobs/dao/AsyncJobJoinMapDao.java  |  46 +
 .../jobs/dao/AsyncJobJoinMapDaoImpl.java        | 303 ++++++
 .../framework/jobs/dao/AsyncJobJournalDao.java  |  27 +
 .../jobs/dao/AsyncJobJournalDaoImpl.java        |  45 +
 .../framework/jobs/dao/SyncQueueDao.java        |  26 +
 .../framework/jobs/dao/SyncQueueDaoImpl.java    |  78 ++
 .../framework/jobs/dao/SyncQueueItemDao.java    |  31 +
 .../jobs/dao/SyncQueueItemDaoImpl.java          | 155 +++
 .../framework/jobs/impl/AsyncJobJoinMapVO.java  | 215 ++++
 .../framework/jobs/impl/AsyncJobJournalVO.java  | 111 +++
 .../framework/jobs/impl/AsyncJobMBeanImpl.java  | 165 ++++
 .../jobs/impl/AsyncJobManagerImpl.java          | 988 +++++++++++++++++++
 .../framework/jobs/impl/AsyncJobMonitor.java    | 185 ++++
 .../framework/jobs/impl/AsyncJobVO.java         | 398 ++++++++
 .../jobs/impl/JobSerializerHelper.java          | 203 ++++
 .../framework/jobs/impl/OutcomeImpl.java        | 124 +++
 .../framework/jobs/impl/SyncQueueItem.java      |  41 +
 .../framework/jobs/impl/SyncQueueItemVO.java    | 143 +++
 .../framework/jobs/impl/SyncQueueManager.java   |  34 +
 .../jobs/impl/SyncQueueManagerImpl.java         | 258 +++++
 .../framework/jobs/impl/SyncQueueVO.java        | 137 +++
 .../manager/ClusteredAgentManagerImpl.java      |   6 +-
 .../com/cloud/async/AsyncJobManagerImpl.java    |   8 +-
 .../cloud/ha/HighAvailabilityManagerImpl.java   |   8 +-
 .../com/cloud/server/LockMasterListener.java    |   8 +-
 .../com/cloud/storage/StorageManagerImpl.java   |  12 +-
 .../vm/ClusteredVirtualMachineManagerImpl.java  |   8 +-
 .../apache/cloudstack/config/Configurable.java  |  21 +
 52 files changed, 4878 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/api/src/org/apache/cloudstack/jobs/JobInfo.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/jobs/JobInfo.java b/api/src/org/apache/cloudstack/jobs/JobInfo.java
new file mode 100644
index 0000000..ac8ffc3
--- /dev/null
+++ b/api/src/org/apache/cloudstack/jobs/JobInfo.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.jobs;
+
+import java.util.Date;
+
+import org.apache.cloudstack.api.Identity;
+import org.apache.cloudstack.api.InternalIdentity;
+
+public interface JobInfo extends Identity, InternalIdentity {
+    public enum Status {
+        IN_PROGRESS(false),
+        SUCCEEDED(true),
+        FAILED(true),
+        CANCELLED(true);
+        
+        private final boolean done;
+
+        private Status(boolean done) {
+            this.done = done;
+        }
+        
+        public boolean done() {
+            return done;
+        }
+    }
+
+    String getType();
+
+    String getDispatcher();
+
+    int getPendingSignals();
+
+    long getUserId();
+
+    long getAccountId();
+
+    String getCmd();
+
+    int getCmdVersion();
+
+    String getCmdInfo();
+
+    Status getStatus();
+
+    int getProcessStatus();
+
+    int getResultCode();
+
+    String getResult();
+
+    Long getInitMsid();
+
+    Long getExecutingMsid();
+
+    Long getCompleteMsid();
+
+    Date getCreated();
+
+    Date getLastUpdated();
+
+    Date getLastPolled();
+
+    String getInstanceType();
+
+    Long getInstanceId();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java b/framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java
index 7e4922e..5125a07 100644
--- a/framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java
+++ b/framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java
@@ -43,11 +43,11 @@ public class ClusterFenceManagerImpl extends ManagerBase implements ClusterFence
 	}
 
 	@Override
-	public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
 	}
 
 	@Override
-	public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java
----------------------------------------------------------------------
diff --git a/framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java
index bcb1736..1231434 100644
--- a/framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java
+++ b/framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java
@@ -19,7 +19,8 @@ package com.cloud.cluster;
 import java.util.List;
 
 public interface ClusterManagerListener {
-	void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId);
-	void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId);
+    void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId);
+
+    void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId);
 	void onManagementNodeIsolated();
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
index acbc5b6..42cd8c5 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
@@ -22,20 +22,20 @@ package org.apache.cloudstack.framework.async;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-import org.apache.log4j.Logger;
-
-import net.sf.cglib.proxy.CallbackFilter;
 import net.sf.cglib.proxy.Callback;
+import net.sf.cglib.proxy.CallbackFilter;
 import net.sf.cglib.proxy.Enhancer;
 import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
 
+import org.apache.log4j.Logger;
+
 @SuppressWarnings("rawtypes")
 public class AsyncCallbackDispatcher<T, R> implements AsyncCompletionCallback {
     private static final Logger s_logger = Logger.getLogger(AsyncCallbackDispatcher.class);
 
     private Method _callbackMethod;
-	private T _targetObject;
+	private final T _targetObject;
 	private Object _contextObject;
 	private Object _resultObject;
 	private AsyncCallbackDriver _driver = new InplaceAsyncCallbackDriver();
@@ -84,6 +84,7 @@ public class AsyncCallbackDispatcher<T, R> implements AsyncCompletionCallback {
         }
 	    });
 	    en.setCallbackFilter(new CallbackFilter() {
+	        @Override
 	        public int accept(Method method) {
 	            if (method.getParameterTypes().length == 0 && method.getName().equals("finalize")) {
 	                return 1;
@@ -115,6 +116,7 @@ public class AsyncCallbackDispatcher<T, R> implements AsyncCompletionCallback {
 		return (P)_contextObject;
 	}
 
+	@Override
 	public void complete(Object resultObject) {
 		_resultObject = resultObject;
 		_driver.performCompletionCallback(this);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
deleted file mode 100644
index d876b01..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.client;
-
-import org.apache.cloudstack.framework.messagebus.MessageBusBase;
-import org.apache.cloudstack.framework.transport.TransportMultiplexier;
-
-public class ClientEventBus extends MessageBusBase implements TransportMultiplexier {
-
-	@Override
-	public void onTransportMessage(String senderEndpointAddress,
-			String targetEndpointAddress, String multiplexer, String message) {
-		// TODO Auto-generated method stub
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/client/ClientMessageBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientMessageBus.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientMessageBus.java
new file mode 100644
index 0000000..6a510dd
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientMessageBus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.client;
+
+import org.apache.cloudstack.framework.messagebus.MessageBusBase;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public class ClientMessageBus extends MessageBusBase implements TransportMultiplexier {
+
+	@Override
+	public void onTransportMessage(String senderEndpointAddress,
+			String targetEndpointAddress, String multiplexer, String message) {
+		// TODO Auto-generated method stub
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index 9cf5e77..a42a604 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -30,10 +30,10 @@ import org.apache.cloudstack.framework.serializer.MessageSerializer;
 
 public class MessageBusBase implements MessageBus {
 
-	private Gate _gate;
-	private List<ActionRecord> _pendingActions;
+	private final Gate _gate;
+	private final List<ActionRecord> _pendingActions;
 	
-	private SubscriptionNode _subscriberRoot;
+	private final SubscriptionNode _subscriberRoot;
 	private MessageSerializer _messageSerializer; 
 	
 	public MessageBusBase() {
@@ -77,7 +77,7 @@ public class MessageBusBase implements MessageBus {
 				if(current != null)
 					current.removeSubscriber(subscriber, false);
 			} else {
-				this._subscriberRoot.removeSubscriber(subscriber, true);
+				_subscriberRoot.removeSubscriber(subscriber, true);
 			}
 			_gate.leave();
 		} else {
@@ -151,11 +151,10 @@ public class MessageBusBase implements MessageBus {
 	private void onGateOpen() {
 		synchronized(_pendingActions) {
 			ActionRecord record = null;
-			if(_pendingActions.size() > 0) {
-				while((record = _pendingActions.remove(0)) != null) {
+            while (_pendingActions.size() > 0) {
+                record = _pendingActions.remove(0);
 					switch(record.getType()) {
-					case Subscribe :
-						{
+                case Subscribe: {
 							SubscriptionNode current = locate(record.getSubject(), null, true);
 							assert(current != null);
 							current.addSubscriber(record.getSubscriber());
@@ -168,7 +167,7 @@ public class MessageBusBase implements MessageBus {
 							if(current != null)
 								current.removeSubscriber(record.getSubscriber(), false);
 						} else {
-							this._subscriberRoot.removeSubscriber(record.getSubscriber(), true);
+                        _subscriberRoot.removeSubscriber(record.getSubscriber(), true);
 						}
 						break;
 					
@@ -188,7 +187,6 @@ public class MessageBusBase implements MessageBus {
 				}
 			}
 		}
-	}
 	
 	private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop,
 		boolean createPath) {
@@ -223,7 +221,7 @@ public class MessageBusBase implements MessageBus {
 		}
 		
 		if(subjectPathTokens.length > 1) {
-			return locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length),
+			return locate(Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length),
 				next, chainFromTop, createPath);
 		} else {
 			return next;
@@ -242,9 +240,9 @@ public class MessageBusBase implements MessageBus {
 	}
 	
 	private static class ActionRecord {
-		private ActionType _type;
-		private String _subject;
-		private MessageSubscriber _subscriber;
+		private final ActionType _type;
+		private final String _subject;
+		private final MessageSubscriber _subscriber;
 		
 		public ActionRecord(ActionType type, String subject, MessageSubscriber subscriber) {
 			_type = type;
@@ -320,10 +318,10 @@ public class MessageBusBase implements MessageBus {
 	}
 	
 	private static class SubscriptionNode {
-		private String _nodeKey;
-		private List<MessageSubscriber> _subscribers;
-		private Map<String, SubscriptionNode> _children;
-		private SubscriptionNode _parent;
+		private final String _nodeKey;
+		private final List<MessageSubscriber> _subscribers;
+		private final Map<String, SubscriptionNode> _children;
+		private final SubscriptionNode _parent;
 		
 		public SubscriptionNode(SubscriptionNode parent, String nodeKey, MessageSubscriber subscriber) {
 			assert(nodeKey != null);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
new file mode 100644
index 0000000..7a7a34a
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messagebus;
+
+public class MessageDetector implements MessageSubscriber {
+	
+	private MessageBus _messageBus;
+	private String[] _subjects;
+	
+	private volatile boolean _signalled = false;
+	
+	public MessageDetector() {
+		_messageBus = null;
+		_subjects = null;
+	}
+	
+	public boolean waitAny(long timeoutInMiliseconds) {
+		_signalled = false;
+		synchronized(this) {
+			try {
+				wait(timeoutInMiliseconds);
+			} catch (InterruptedException e) {
+			}
+		}
+		return _signalled;
+	}
+	
+	public void open(MessageBus messageBus, String[] subjects) {
+		assert(messageBus != null);
+		assert(subjects != null);
+		
+		_messageBus = messageBus;
+		_subjects = subjects;
+		
+		if(subjects != null) {
+			for(String subject : subjects) {
+				messageBus.subscribe(subject, this);
+			}
+		}
+	}
+	
+	public void close() {
+		if(_subjects != null) {
+			assert(_messageBus != null);
+			
+			for(String subject : _subjects) {
+				_messageBus.unsubscribe(subject, this);
+			}
+		}
+	}
+
+	@Override
+	public void onPublishMessage(String senderAddress, String subject, Object args) {
+		synchronized(this) {
+			_signalled = true;
+			notifyAll();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
deleted file mode 100644
index f3b782d..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.server;
-
-import org.apache.cloudstack.framework.messagebus.MessageBusBase;
-import org.apache.cloudstack.framework.transport.TransportMultiplexier;
-
-public class ServerEventBus extends MessageBusBase implements TransportMultiplexier {
-
-	@Override
-	public void onTransportMessage(String senderEndpointAddress,
-			String targetEndpointAddress, String multiplexer, String message) {
-		// TODO Auto-generated method stub
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/src/org/apache/cloudstack/framework/server/ServerMessageBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerMessageBus.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerMessageBus.java
new file mode 100644
index 0000000..a011468
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerMessageBus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.server;
+
+import org.apache.cloudstack.framework.messagebus.MessageBusBase;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public class ServerMessageBus extends MessageBusBase implements TransportMultiplexier {
+
+	@Override
+	public void onTransportMessage(String senderEndpointAddress,
+			String targetEndpointAddress, String multiplexer, String message) {
+		// TODO Auto-generated method stub
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagementServer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagementServer.java b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagementServer.java
index 2a168ac..28eb4ab 100644
--- a/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagementServer.java
+++ b/framework/ipc/test/org/apache/cloudstack/framework/sampleserver/SampleManagementServer.java
@@ -18,9 +18,6 @@
  */
 package org.apache.cloudstack.framework.sampleserver;
 
-import org.springframework.stereotype.Component;
-
-@Component
 public class SampleManagementServer {
 
 	public void mainLoop() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
index dabfdd3..33c5ce5 100644
--- a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
+++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
@@ -23,6 +23,7 @@ import javax.inject.Inject;
 import junit.framework.TestCase;
 
 import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDetector;
 import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
 import org.apache.cloudstack.framework.messagebus.PublishScope;
 import org.junit.Assert;
@@ -113,4 +114,42 @@ public class TestMessageBus extends TestCase {
 		
 		_messageBus.clearAll();
 	}
+	
+	public void testMessageDetector() {
+		MessageDetector detector = new MessageDetector();
+		detector.open(_messageBus, new String[] {"VM", "Host"});
+		
+		Thread thread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for(int i = 0; i < 2; i++) {
+					try {
+						Thread.sleep(3000);
+					} catch (InterruptedException e) {
+					}
+					_messageBus.publish(null, "Host", PublishScope.GLOBAL, null);
+				}
+			}
+		});
+		thread.start();
+		
+		try {
+			int count = 0;
+			while(count < 2) {
+				if(detector.waitAny(1000)) {
+					System.out.println("Detected signal on bus");
+					count++;
+				} else {
+					System.out.println("Waiting timed out");
+				}
+			}
+		} finally {
+			detector.close();
+		}
+		
+		try {
+			thread.join();
+		} catch (InterruptedException e) {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/pom.xml
----------------------------------------------------------------------
diff --git a/framework/jobs/pom.xml b/framework/jobs/pom.xml
index cf1fdd5..d5c16e8 100644
--- a/framework/jobs/pom.xml
+++ b/framework/jobs/pom.xml
@@ -28,11 +28,6 @@
   </parent>  
   <dependencies>
     <dependency>
-      <groupId>org.quartz-scheduler</groupId>
-      <artifactId>quartz</artifactId>
-      <version>2.1.6</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.cloudstack</groupId>
       <artifactId>cloud-utils</artifactId>
       <version>${project.version}</version>
@@ -42,8 +37,20 @@
       <artifactId>cloud-api</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-ipc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-db</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-cluster</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
-  <build>
-    <defaultGoal>install</defaultGoal>
-  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/job/Job.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/job/Job.java b/framework/jobs/src/org/apache/cloudstack/framework/job/Job.java
deleted file mode 100755
index 62ed72f..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/job/Job.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.job;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface Job {
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/job/JobInterceptor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/job/JobInterceptor.java b/framework/jobs/src/org/apache/cloudstack/framework/job/JobInterceptor.java
deleted file mode 100755
index d81077d..0000000
--- a/framework/jobs/src/org/apache/cloudstack/framework/job/JobInterceptor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.job;
-
-public class JobInterceptor {
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
new file mode 100644
index 0000000..61fb396
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.Date;
+
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.jobs.JobInfo;
+
+public interface AsyncJob extends JobInfo {
+
+    public enum JournalType {
+        SUCCESS, FAILURE
+    };
+
+    public static interface Topics {
+        public static final String JOB_HEARTBEAT = "job.heartbeat";
+        public static final String JOB_STATE = "job.state";
+    }
+    
+    public static interface Constants {
+
+    	// Although we may have detailed masks for each individual wakeup event, i.e.
+        // periodical timer, matched topic from message bus, it seems that we don't
+        // need to distinguish them to such level. Therefore, only one wakeup signal
+        // is defined
+        public static final int SIGNAL_MASK_WAKEUP = 1;
+        
+        public static final String SYNC_LOCK_NAME = "SyncLock";
+    }
+	
+    @Override
+    String getType();
+    
+    @Override
+    String getDispatcher();
+
+    @Override
+    int getPendingSignals();
+    
+    @Override
+    long getUserId();
+
+    @Override
+    long getAccountId();
+
+    @Override
+    String getCmd();
+
+    @Override
+    int getCmdVersion();
+
+    @Override
+    String getCmdInfo();
+    
+    @Override
+    Status getStatus();
+
+    @Override
+    int getProcessStatus();
+
+    @Override
+    int getResultCode();
+
+    @Override
+    String getResult();
+
+    @Override
+    Long getInitMsid();
+
+    void setInitMsid(Long msid);
+
+    @Override
+    Long getExecutingMsid();
+    
+    @Override
+    Long getCompleteMsid();
+
+    void setCompleteMsid(Long msid);
+
+    @Override
+    Date getCreated();
+
+    @Override
+    Date getLastUpdated();
+
+    @Override
+    Date getLastPolled();
+
+    @Override
+    String getInstanceType();
+
+    @Override
+    Long getInstanceId();
+
+    String getShortUuid();
+
+    SyncQueueItem getSyncSource();
+
+    void setSyncSource(SyncQueueItem item);
+
+    String getRelated();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobDispatcher.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobDispatcher.java
new file mode 100644
index 0000000..5b0d15d
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobDispatcher.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import com.cloud.utils.component.Adapter;
+
+//
+// We extend it from Adapter interface for
+//	1)	getName()/setName()
+//	2)	Confirming to general adapter pattern used across CloudStack
+//
+public interface AsyncJobDispatcher extends Adapter {
+	void runJob(AsyncJob job);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
new file mode 100644
index 0000000..0136593
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.ResourceUnavailableException;
+
+public class AsyncJobExecutionContext  {
+    private AsyncJob _job;
+	
+    static private AsyncJobManager _jobMgr;
+    static private AsyncJobJoinMapDao _joinMapDao;
+
+    public static void init(AsyncJobManager jobMgr, AsyncJobJoinMapDao joinMapDao) {
+        _jobMgr = jobMgr;
+        _joinMapDao = joinMapDao;
+    }
+	
+	private static ThreadLocal<AsyncJobExecutionContext> s_currentExectionContext = new ThreadLocal<AsyncJobExecutionContext>();
+
+	public AsyncJobExecutionContext() {
+	}
+	
+    public AsyncJobExecutionContext(AsyncJob job) {
+		_job = job;
+	}
+	
+	public SyncQueueItem getSyncSource() {
+		return _job.getSyncSource();
+	}
+	
+	public void resetSyncSource() {
+		_job.setSyncSource(null);
+	}
+	
+    public AsyncJob getJob() {
+		return _job;
+	}
+	
+    public void setJob(AsyncJob job) {
+		_job = job;
+	}
+	
+    public void completeAsyncJob(JobInfo.Status jobStatus, int resultCode, String resultObject) {
+    	assert(_job != null);
+    	_jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject);
+    }
+    
+    public void updateAsyncJobStatus(int processStatus, String resultObject) {
+    	assert(_job != null);
+    	_jobMgr.updateAsyncJobStatus(_job.getId(), processStatus, resultObject);
+    }
+    
+    public void updateAsyncJobAttachment(String instanceType, Long instanceId) {
+    	assert(_job != null);
+    	_jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId);
+    }
+	
+    public void logJobJournal(AsyncJob.JournalType journalType, String journalText, String journalObjJson) {
+		assert(_job != null);
+		_jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson);
+	}
+
+    public void log(Logger logger, String journalText) {
+        _jobMgr.logJobJournal(_job.getId(), AsyncJob.JournalType.SUCCESS, journalText, null);
+        logger.debug(journalText);
+    }
+
+    public void joinJob(long joinJobId) {
+    	assert(_job != null);
+    	_jobMgr.joinJob(_job.getId(), joinJobId);
+    }
+	
+    public void joinJob(long joinJobId, String wakeupHandler, String wakeupDispatcher,
+    		String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) {
+    	assert(_job != null);
+    	_jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus,
+    		wakeupIntervalInMilliSeconds, timeoutInMilliSeconds);
+    }
+    
+    //
+	// check failure exception before we disjoin the worker job
+	// TODO : it is ugly and this will become unnecessary after we switch to full-async mode
+	//
+    public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
+		ConcurrentOperationException, ResourceUnavailableException {
+    	assert(_job != null);
+    	
+    	AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
+    	if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
+    		Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
+    		if(exception != null && exception instanceof Exception) {
+    			if(exception instanceof InsufficientCapacityException)
+    				throw (InsufficientCapacityException)exception;
+    			else if(exception instanceof ConcurrentOperationException)
+    				throw (ConcurrentOperationException)exception;
+    			else if(exception instanceof ResourceUnavailableException)
+    				throw (ResourceUnavailableException)exception;
+    			else
+    				throw new RuntimeException((Exception)exception);
+    		}
+    	}
+    	
+    	_jobMgr.disjoinJob(_job.getId(), joinedJobId);
+    }
+    
+    public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
+    	assert(_job != null);
+    	_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
+    }
+    
+    public void completeJobAndJoin(JobInfo.Status joinStatus, String joinResult) {
+    	assert(_job != null);
+    	_jobMgr.completeJoin(_job.getId(), joinStatus, joinResult);
+    	_jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null);
+    }
+
+	public static AsyncJobExecutionContext getCurrentExecutionContext() {
+		AsyncJobExecutionContext context = s_currentExectionContext.get();
+		return context;
+	}
+	
+    public static AsyncJobExecutionContext registerPseudoExecutionContext(long accountId, long userId) {
+        AsyncJobExecutionContext context = s_currentExectionContext.get();
+        if (context == null) {
+            context = new AsyncJobExecutionContext();
+            context.setJob(_jobMgr.getPseudoJob(accountId, userId));
+            setCurrentExecutionContext(context);
+        }
+
+        return context;
+    }
+
+    public static AsyncJobExecutionContext unregister() {
+        AsyncJobExecutionContext context = s_currentExectionContext.get();
+        setCurrentExecutionContext(null);
+        return context;
+    }
+
+    // This is intended to be package level access for AsyncJobManagerImpl only.
+    public static void setCurrentExecutionContext(AsyncJobExecutionContext currentContext) {
+		s_currentExectionContext.set(currentContext);
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBean.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBean.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBean.java
new file mode 100644
index 0000000..8ba3c87
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobMBean.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+public interface AsyncJobMBean {
+	public long getAccountId();
+	public long getUserId();
+	public String getCmd();
+	public String getCmdInfo();
+	public String getStatus();
+	public int getProcessStatus();
+	public int getResultCode();
+	public String getResult();
+	public String getInstanceType();
+	public String getInstanceId();
+	public String getInitMsid();
+	public String getCreateTime();
+	public String getLastUpdateTime();
+	public String getLastPollTime();
+	public String getSyncQueueId();
+	public String getSyncQueueContentType();
+	public String getSyncQueueContentId();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
new file mode 100644
index 0000000..bc06101
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under ones
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.utils.Predicate;
+import com.cloud.utils.component.Manager;
+
+public interface AsyncJobManager extends Manager {
+    
+	public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
+
+    AsyncJobVO getAsyncJob(long jobId);
+	
+	List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
+	
+	long submitAsyncJob(AsyncJob job);
+	long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
+
+    void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, String result);
+
+    void updateAsyncJobStatus(long jobId, int processStatus, String resultObject);
+    void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
+    void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
+    	journalText, String journalObjJson);
+    
+	/**
+	 * A running thread inside management server can have a 1:1 linked pseudo job.
+	 * This is to help make some legacy code work without too dramatic changes.
+	 * 
+	 * All pseudo jobs should be expunged upon management start event
+	 *
+	 * @return pseudo job for the thread
+	 */
+    AsyncJob getPseudoJob(long accountId, long userId);
+
+    /**
+     * Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
+     * in synchronous way. Caller needs to use waitAndCheck() to check the completion status
+     * of the down-level job
+     * 
+     * Due to the amount of legacy code that relies on synchronous-call semantics, this form of joinJob
+     * is used mostly
+     * 
+     * 
+     * @param jobId upper job that is going to wait the completion of a down-level job
+     * @param joinJobId down-level job
+	 */
+	void joinJob(long jobId, long joinJobId);
+	
+    /**
+     * Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
+     * in asynchronous way, it will cause upper job to cease current execution, upper job will be
+     * rescheduled to execute periodically or on wakeup events detected from message bus
+     * 
+     * @param jobId upper job that is going to wait the completion of a down-level job
+     * @param joinJobId down-level job
+     * @Param wakeupHandler	wake-up handler
+     * @Param wakeupDispatcher wake-up dispatcher
+     * @param wakeupTopicsOnMessageBus
+     * @param wakeupIntervalInMilliSeconds
+     * @param timeoutInMilliSeconds
+     */
+    void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakupDispatcher,
+    		String[] wakeupTopicsOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds);
+    
+    /**
+     * Dis-join two related jobs
+     * 
+     * @param jobId
+     * @param joinedJobId
+     */
+    void disjoinJob(long jobId, long joinedJobId);
+    
+    /**
+     * Used by down-level job to notify its completion to upper level jobs
+     * 
+     * @param joinJobId down-level job for upper level job to join with
+     * @param joinStatus AsyncJobConstants status code to indicate success or failure of the
+     * 					down-level job
+     * @param joinResult object-stream serialized result object
+     * 					this is primarily used by down-level job to pass error exception objects
+     * 					for legacy code to work. To help pass exception object easier, we use
+     * 					object-stream based serialization instead of GSON
+     */
+    void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult);
+   
+    void releaseSyncSource();
+    void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
+    
+    /**
+     * This method will be deprecated after all code has been migrated to fully-asynchronous mode
+     * that uses async-feature of joinJob/disjoinJob
+     * 
+     * @param wakupTopicsOnMessageBus topic on message bus to wakeup the wait
+     * @param checkIntervalInMilliSeconds time to break out wait for checking predicate condition
+     * @param timeoutInMiliseconds time out to break out the whole wait process
+     * @param predicate
+     * @return true, predicate condition is satisfied
+     * 			false, wait is timed out
+     */
+    boolean waitAndCheck(AsyncJob job, String[] wakupTopicsOnMessageBus, long checkIntervalInMilliSeconds,
+    	long timeoutInMiliseconds, Predicate predicate);
+
+    AsyncJob queryJob(long jobId, boolean updatePollTime);
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java
new file mode 100644
index 0000000..28c1e5b
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.concurrent.CancellationException;
+
+import com.cloud.utils.SerialVersionUID;
+
+
+/**
+ * This exception is fired when the job has been cancelled
+ *
+ */
+public class JobCancellationException extends CancellationException {
+    
+    private static final long serialVersionUID = SerialVersionUID.AffinityConflictException;
+
+    public enum Reason {
+        RequestedByUser,
+        RequestedByCaller,
+        TimedOut;
+    }
+
+    Reason reason;
+
+    public JobCancellationException(Reason reason) {
+        super("The job was cancelled due to " + reason.toString());
+        this.reason = reason;
+    }
+
+    public Reason getReason() {
+        return reason;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java
new file mode 100644
index 0000000..b400b71
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Outcome is returned by clients of jobs framework as a way to wait for the
+ * outcome of a job.  It fully complies with how Future interface is designed.
+ * In addition, it allows the callee to file a task to be scheduled when the
+ * job completes.
+ * 
+ * Note that the callee should schedule a job when using the Task interface.
+ * It shouldn't try to complete the job in the schedule code as that will take
+ * up threads in the jobs framework.
+ * 
+ * For the client of the jobs framework, you can either use the OutcomeImpl
+ * class to implement this interface or you can add to this interface to
+ * allow for your specific exceptions to be thrown.
+ *
+ * @param <T> Object returned to the callee when the job completes
+ */
+public interface Outcome<T> extends Future<T> {
+    AsyncJob getJob();
+
+    /**
+     * In addition to the normal Future methods, Outcome allows the ability
+     * to register a schedule task to be performed when the job is completed.
+     * 
+     * @param listener
+     */
+    void execute(Task<T> task);
+
+    void execute(Task<T> task, long wait, TimeUnit unit);
+
+    /**
+     * Listener is used by Outcome to schedule a task to run when a job
+     * completes.
+     *
+     * @param <T> T result returned
+     */
+    public interface Task<T> extends Runnable {
+        void schedule(AsyncJobExecutionContext context, T result);
+
+        void scheduleOnError(AsyncJobExecutionContext context, Throwable e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
new file mode 100644
index 0000000..cfcd173
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
+	AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
+	List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId);
+	
+	AsyncJobVO findPseudoJob(long threadId, long msid);
+	void cleanupPseduoJobs(long msid);
+	
+	List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
+	List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit);
+	void resetJobProcess(long msid, int jobResultCode, String jobResultMessage);
+	List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
new file mode 100644
index 0000000..fb3845c
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+
+public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
+    private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
+	
+	private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;
+	private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
+    private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;
+	private final SearchBuilder<AsyncJobVO> pseudoJobSearch;
+	private final SearchBuilder<AsyncJobVO> pseudoJobCleanupSearch;
+	private final SearchBuilder<AsyncJobVO> expiringUnfinishedAsyncJobSearch;
+	private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
+
+	
+	public AsyncJobDaoImpl() {
+		pendingAsyncJobSearch = createSearchBuilder();
+		pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(),
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(),
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(),
+				SearchCriteria.Op.EQ);
+		pendingAsyncJobSearch.done();
+		
+        expiringAsyncJobSearch = createSearchBuilder();
+        expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
+        expiringAsyncJobSearch.done();
+
+		pendingAsyncJobsSearch = createSearchBuilder();
+		pendingAsyncJobsSearch.and("instanceType", pendingAsyncJobsSearch.entity().getInstanceType(),
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobsSearch.and("accountId", pendingAsyncJobsSearch.entity().getAccountId(),
+			SearchCriteria.Op.EQ);
+		pendingAsyncJobsSearch.and("status", pendingAsyncJobsSearch.entity().getStatus(),
+				SearchCriteria.Op.EQ);
+		pendingAsyncJobsSearch.done();
+		
+		expiringUnfinishedAsyncJobSearch = createSearchBuilder();
+		expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(),
+			SearchCriteria.Op.LTEQ);
+		expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL);
+		expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
+		expiringUnfinishedAsyncJobSearch.done();
+		
+		expiringCompletedAsyncJobSearch = createSearchBuilder();
+		expiringCompletedAsyncJobSearch.and("created", expiringCompletedAsyncJobSearch.entity().getCreated(),
+			SearchCriteria.Op.LTEQ);
+        expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NNULL);
+        expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ);
+        expiringCompletedAsyncJobSearch.done();
+		
+		pseudoJobSearch = createSearchBuilder();
+		pseudoJobSearch.and("jobDispatcher", pseudoJobSearch.entity().getDispatcher(), Op.EQ);
+		pseudoJobSearch.and("instanceType", pseudoJobSearch.entity().getInstanceType(), Op.EQ);
+		pseudoJobSearch.and("instanceId", pseudoJobSearch.entity().getInstanceId(), Op.EQ);
+		pseudoJobSearch.done();
+		
+		pseudoJobCleanupSearch = createSearchBuilder();
+		pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ);
+		pseudoJobCleanupSearch.done();
+		
+	}
+	
+	@Override
+    public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
+        SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
+        sc.setParameters("instanceType", instanceType);
+        sc.setParameters("instanceId", instanceId);
+        sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
+        
+        List<AsyncJobVO> l = listIncludingRemovedBy(sc);
+        if(l != null && l.size() > 0) {
+        	if(l.size() > 1) {
+        		s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job");
+        	}
+        	
+        	return l.get(0);
+        }
+        return null;
+	}
+	
+	@Override
+    public List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId) {
+		SearchCriteria<AsyncJobVO> sc = pendingAsyncJobsSearch.create();
+        sc.setParameters("instanceType", instanceType);
+        
+        if (accountId != null) {
+            sc.setParameters("accountId", accountId);
+        }
+        sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
+        
+        return listBy(sc);
+	}
+	
+	@Override
+    public AsyncJobVO findPseudoJob(long threadId, long msid) {
+		SearchCriteria<AsyncJobVO> sc = pseudoJobSearch.create();
+        sc.setParameters("jobDispatcher", AsyncJobVO.JOB_DISPATCHER_PSEUDO);
+        sc.setParameters("instanceType", AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
+		sc.setParameters("instanceId", threadId);
+		
+		List<AsyncJobVO> result = listBy(sc);
+		if(result != null && result.size() > 0) {
+			assert(result.size() == 1);
+			return result.get(0);
+		}
+		
+		return null;
+	}
+	
+	@Override
+    public void cleanupPseduoJobs(long msid) {
+		SearchCriteria<AsyncJobVO> sc = pseudoJobCleanupSearch.create();
+		sc.setParameters("initMsid", msid);
+		this.expunge(sc);
+	}
+	
+    @Override
+    public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
+        SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
+        sc.setParameters("created", cutTime);
+        Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
+        return listIncludingRemovedBy(sc, filter);
+    }
+    
+    @Override
+	public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
+		SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create();
+		sc.setParameters("created", cutTime);
+        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+		Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
+		return listIncludingRemovedBy(sc, filter);
+	}
+	
+	@Override
+	public List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit) {
+		SearchCriteria<AsyncJobVO> sc = expiringCompletedAsyncJobSearch.create();
+		sc.setParameters("created", cutTime);
+        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+		Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
+		return listIncludingRemovedBy(sc, filter);
+	}
+
+	@Override
+    @DB
+	public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
+        String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED.ordinal() + ", job_result_code=" + jobResultCode
+                + ", job_result='" + jobResultMessage + "' where job_status=" + JobInfo.Status.IN_PROGRESS.ordinal()
+                + " AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))";
+		
+        Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = txn.prepareAutoCloseStatement(sql);
+            pstmt.setLong(1, msid);
+            pstmt.setLong(2, msid);
+            pstmt.execute();
+        } catch (SQLException e) {
+        	s_logger.warn("Unable to reset job status for management server " + msid, e);
+        } catch (Throwable e) {
+        	s_logger.warn("Unable to reset job status for management server " + msid, e);
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
new file mode 100644
index 0000000..577ed10
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> {
+	
+	Long joinJob(long jobId, long joinJobId, long joinMsid,
+		long wakeupIntervalMs, long expirationMs,
+		Long syncSourceId, String wakeupHandler, String wakeupDispatcher);
+	void disjoinJob(long jobId, long joinedJobId);
+	void disjoinAllJobs(long jobId);
+	
+	AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
+	List<AsyncJobJoinMapVO> listJoinRecords(long jobId);
+	
+    void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid);
+	
+//	List<Long> wakeupScan();
+
+    List<Long> findJobsToWake(long joinedJobId);
+
+    List<Long> findJobsToWakeBetween(Date cutDate);
+//	List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
new file mode 100644
index 0000000..20d8ba6
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.UpdateBuilder;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao {
+    public static final Logger s_logger = Logger.getLogger(AsyncJobJoinMapDaoImpl.class);
+	
+	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;
+	private final SearchBuilder<AsyncJobJoinMapVO> RecordSearchByOwner;
+	private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;
+	private final SearchBuilder<AsyncJobJoinMapVO> WakeupSearch;
+
+//    private final GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobSearch;
+	
+    protected AsyncJobJoinMapDaoImpl() {
+		RecordSearch = createSearchBuilder();
+		RecordSearch.and("jobId", RecordSearch.entity().getJobId(), Op.EQ);
+		RecordSearch.and("joinJobId", RecordSearch.entity().getJoinJobId(), Op.EQ);
+		RecordSearch.done();
+
+		RecordSearchByOwner = createSearchBuilder();
+		RecordSearchByOwner.and("jobId", RecordSearchByOwner.entity().getJobId(), Op.EQ);
+		RecordSearchByOwner.done();
+		
+		CompleteJoinSearch = createSearchBuilder();
+		CompleteJoinSearch.and("joinJobId", CompleteJoinSearch.entity().getJoinJobId(), Op.EQ);
+		CompleteJoinSearch.done();
+		
+		WakeupSearch = createSearchBuilder();
+		WakeupSearch.and("nextWakeupTime", WakeupSearch.entity().getNextWakeupTime(), Op.LT);
+		WakeupSearch.and("expiration", WakeupSearch.entity().getExpiration(), Op.GT);
+		WakeupSearch.and("joinStatus", WakeupSearch.entity().getJoinStatus(), Op.EQ);
+		WakeupSearch.done();
+
+//        JoinJobSearch = createSearchBuilder(Long.class);
+//        JoinJobSearch.and(JoinJobSearch.entity().getJoinJobId(), Op.SC, "joinJobId");
+//        JoinJobSearch.done();
+	}
+	
+	@Override
+    public Long joinJob(long jobId, long joinJobId, long joinMsid,
+		long wakeupIntervalMs, long expirationMs,
+		Long syncSourceId, String wakeupHandler, String wakeupDispatcher) {
+		
+		AsyncJobJoinMapVO record = new AsyncJobJoinMapVO();
+		record.setJobId(jobId);
+		record.setJoinJobId(joinJobId);
+		record.setJoinMsid(joinMsid);
+		record.setJoinStatus(JobInfo.Status.IN_PROGRESS);
+		record.setSyncSourceId(syncSourceId);
+		record.setWakeupInterval(wakeupIntervalMs / 1000);		// convert millisecond to second
+		record.setWakeupHandler(wakeupHandler);
+		record.setWakeupDispatcher(wakeupDispatcher);
+		if(wakeupHandler != null) {
+			record.setNextWakeupTime(new Date(DateUtil.currentGMTTime().getTime() + wakeupIntervalMs));
+			record.setExpiration(new Date(DateUtil.currentGMTTime().getTime() + expirationMs));
+		}
+		
+		persist(record);
+		return record.getId();
+	}
+	
+	@Override
+    public void disjoinJob(long jobId, long joinedJobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
+		sc.setParameters("jobId", jobId);
+		sc.setParameters("joinJobId", joinedJobId);
+		
+		this.expunge(sc);
+	}
+	
+	@Override
+    public void disjoinAllJobs(long jobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
+		sc.setParameters("jobId", jobId);
+		
+		this.expunge(sc);
+	}
+	
+	@Override
+    public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
+		sc.setParameters("jobId", jobId);
+		sc.setParameters("joinJobId", joinJobId);
+		
+		List<AsyncJobJoinMapVO> result = this.listBy(sc);
+		if(result != null && result.size() > 0) {
+			assert(result.size() == 1);
+			return result.get(0);
+		}
+		
+		return null;
+	}
+	
+	@Override
+    public List<AsyncJobJoinMapVO> listJoinRecords(long jobId) {
+		SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearchByOwner.create();
+		sc.setParameters("jobId", jobId);
+		
+		return this.listBy(sc);
+	}
+	
+    @Override
+    public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid) {
+        AsyncJobJoinMapVO record = createForUpdate();
+        record.setJoinStatus(joinStatus);
+        record.setJoinResult(joinResult);
+        record.setCompleteMsid(completeMsid);
+        record.setLastUpdated(DateUtil.currentGMTTime());
+        
+        UpdateBuilder ub = getUpdateBuilder(record);
+        
+        SearchCriteria<AsyncJobJoinMapVO> sc = CompleteJoinSearch.create();
+        sc.setParameters("joinJobId", joinJobId);
+        update(ub, sc, null);
+	}
+
+//	@Override
+//    public List<Long> wakeupScan() {
+//		List<Long> standaloneList = new ArrayList<Long>();
+//
+//		Date cutDate = DateUtil.currentGMTTime();
+//
+//		Transaction txn = Transaction.currentTxn();
+//        PreparedStatement pstmt = null;
+//        try {
+//			txn.start();
+//
+//			//
+//			// performance sensitive processing, do it in plain SQL
+//			//
+//			String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
+//					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+//			pstmt = txn.prepareStatement(sql);
+//			pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+//	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.executeUpdate();
+//	        pstmt.close();
+//
+//			sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
+//					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+//			pstmt = txn.prepareStatement(sql);
+//	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.executeUpdate();
+//	        pstmt.close();
+//
+//	        sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+//			pstmt = txn.prepareStatement(sql);
+//	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        ResultSet rs = pstmt.executeQuery();
+//	        while(rs.next()) {
+//	        	standaloneList.add(rs.getLong(1));
+//	        }
+//	        rs.close();
+//	        pstmt.close();
+//
+//	        // update for next wake-up
+//	        sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?";
+//			pstmt = txn.prepareStatement(sql);
+//	        pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//	        pstmt.executeUpdate();
+//	        pstmt.close();
+//
+//	        txn.commit();
+//		} catch (SQLException e) {
+//			s_logger.error("Unexpected exception", e);
+//		}
+//
+//        return standaloneList;
+//	}
+
+    @Override
+    public List<Long> findJobsToWake(long joinedJobId) {
+        // TODO: We should fix this.  We shouldn't be crossing daos in a dao code.
+        List<Long> standaloneList = new ArrayList<Long>();
+        Transaction txn = Transaction.currentTxn();
+        String sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+        try {
+            PreparedStatement pstmt = txn.prepareStatement(sql);
+            pstmt.setLong(1, joinedJobId);
+            ResultSet rs = pstmt.executeQuery();
+            while (rs.next()) {
+                standaloneList.add(rs.getLong(1));
+            }
+        } catch (SQLException e) {
+            throw new CloudRuntimeException("Unable to execute " + sql, e);
+        }
+        return standaloneList;
+    }
+
+    @Override
+    public List<Long> findJobsToWakeBetween(Date cutDate) {
+        List<Long> standaloneList = new ArrayList<Long>();
+        Transaction txn = Transaction.currentTxn();
+        try {
+            String sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+            PreparedStatement pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            ResultSet rs = pstmt.executeQuery();
+            while (rs.next()) {
+                standaloneList.add(rs.getLong(1));
+            }
+
+            // update for next wake-up
+            sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+
+            return standaloneList;
+        } catch (SQLException e) {
+            throw new CloudRuntimeException("Unable to handle SQL exception", e);
+        }
+
+    }
+	
+//    @Override
+//    public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+//        List<Long> standaloneList = new ArrayList<Long>();
+//
+//        Transaction txn = Transaction.currentTxn();
+//        PreparedStatement pstmt = null;
+//        try {
+//            txn.start();
+//
+//            //
+//            // performance sensitive processing, do it in plain SQL
+//            //
+//            String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
+//                    "(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
+//            pstmt = txn.prepareStatement(sql);
+//            pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP);
+//            pstmt.setLong(2, joinedJobId);
+//            pstmt.executeUpdate();
+//            pstmt.close();
+//
+//            sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
+//                    "(SELECT job_id FROM async_job_join_map WHERE join_job_id = ?)";
+//            pstmt = txn.prepareStatement(sql);
+//            pstmt.setLong(1, joinedJobId);
+//            pstmt.executeUpdate();
+//            pstmt.close();
+//
+//            sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+//            pstmt = txn.prepareStatement(sql);
+//            pstmt.setLong(1, joinedJobId);
+//            ResultSet rs = pstmt.executeQuery();
+//            while(rs.next()) {
+//                standaloneList.add(rs.getLong(1));
+//            }
+//            rs.close();
+//            pstmt.close();
+//
+//            txn.commit();
+//        } catch (SQLException e) {
+//            s_logger.error("Unexpected exception", e);
+//        }
+//
+//        return standaloneList;
+//    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
new file mode 100644
index 0000000..fb6a242
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDao.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
+
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobJournalDao extends GenericDao<AsyncJobJournalVO, Long> {
+	List<AsyncJobJournalVO> getJobJournal(long jobId);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1e1ee902/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
new file mode 100644
index 0000000..d26e6ed
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJournalDaoImpl.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.framework.jobs.dao;
+
+import java.util.List;
+
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
+
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+
+public class AsyncJobJournalDaoImpl extends GenericDaoBase<AsyncJobJournalVO, Long> implements AsyncJobJournalDao {
+
+	private final SearchBuilder<AsyncJobJournalVO> JobJournalSearch;	
+
+	public AsyncJobJournalDaoImpl() {
+		JobJournalSearch = createSearchBuilder();
+		JobJournalSearch.and("jobId", JobJournalSearch.entity().getJobId(), Op.EQ);
+		JobJournalSearch.done();
+	}
+	
+	@Override
+	public List<AsyncJobJournalVO> getJobJournal(long jobId) {
+		SearchCriteria<AsyncJobJournalVO> sc = JobJournalSearch.create();
+		sc.setParameters("jobId", jobId);
+		
+		return this.listBy(sc);
+	}
+}


Mime
View raw message