cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: updated refs/heads/vmsync to 9e4ebdd
Date Thu, 02 May 2013 05:45:46 GMT
Updated Branches:
  refs/heads/vmsync 17930685e -> 9e4ebdd8b


Add job monitor to help manage system concurrent level


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/9e4ebdd8
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/9e4ebdd8
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/9e4ebdd8

Branch: refs/heads/vmsync
Commit: 9e4ebdd8b377d2af76fac2d12556441226f6502e
Parents: 1793068
Author: Kelven Yang <kelveny@gmail.com>
Authored: Wed May 1 22:45:27 2013 -0700
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Wed May 1 22:45:27 2013 -0700

----------------------------------------------------------------------
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |    2 +-
 server/src/com/cloud/async/AsyncJobMonitor.java    |  177 +++++++++++++++
 .../cloudstack/messagebus/SubjectConstants.java    |    4 +
 .../test/com/cloud/async/TestAsyncJobManager.java  |    8 +-
 server/test/resources/AsyncJobTestContext.xml      |    4 +
 5 files changed, 193 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9e4ebdd8/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index e2a65b7..f091d5d 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -71,7 +71,7 @@ import com.cloud.utils.mgmt.JmxUtil;
 import com.cloud.utils.net.MacAddress;
 
 public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener
{
-    public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName());
+    public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
     private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; 	// 3 seconds
 
     private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9e4ebdd8/server/src/com/cloud/async/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobMonitor.java b/server/src/com/cloud/async/AsyncJobMonitor.java
new file mode 100644
index 0000000..4208444
--- /dev/null
+++ b/server/src/com/cloud/async/AsyncJobMonitor.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.async;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
+import org.apache.cloudstack.messagebus.SubjectConstants;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.component.ManagerBase;
+
+public class AsyncJobMonitor extends ManagerBase {
+    public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
+    
+    @Inject private MessageBus _messageBus;
+	
+	private Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
+	private Timer _timer = new Timer();
+	
+	private volatile int _activePoolThreads = 0;
+	private volatile int _activeInplaceThreads = 0;
+	
+	// configuration
+	private long _inactivityCheckIntervalMs = 60000;
+	private long _inactivityWarningThresholdMs = 90000;
+	
+	public AsyncJobMonitor() {
+	}
+	
+	public long getInactivityCheckIntervalMs() {
+		return _inactivityCheckIntervalMs;
+	}
+	
+	public void setInactivityCheckIntervalMs(long intervalMs) {
+		_inactivityCheckIntervalMs = intervalMs;
+	}
+	
+	public long getInactivityWarningThresholdMs() {
+		return _inactivityWarningThresholdMs;
+	}
+	
+	public void setInactivityWarningThresholdMs(long thresholdMs) {
+		_inactivityWarningThresholdMs = thresholdMs;
+	}
+	
+	@MessageHandler(topic=SubjectConstants.JOB_HEARTBEAT)
+	public void onJobHeartbeatNotify(String subject, String senderAddress, Object args) {
+		if(args != null && args instanceof Long) {
+			synchronized(this) {
+				ActiveTaskRecord record = _activeTasks.get((Long)args);
+				if(record != null) {
+					record.updateJobHeartbeatTick();
+				}
+			}
+		}
+	}
+	
+	private void heartbeat() {
+		synchronized(this) {
+			for(Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
+				if(entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs)
{
+					s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "

+						+ entry.getValue().millisSinceLastJobHeartbeat()/1000 + " seconds");
+				}
+			}
+		}
+	}
+	
+	@Override
+	public boolean configure(String name, Map<String, Object> params)
+			throws ConfigurationException {
+		
+		_messageBus.subscribe(SubjectConstants.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
+		_timer.scheduleAtFixedRate(new TimerTask() {
+
+			@Override
+			public void run() {
+				heartbeat();
+			}
+			
+		}, _inactivityCheckIntervalMs, _inactivityCheckIntervalMs);
+		return true;
+	}
+	
+	public void registerActiveTask(long jobId, long threadId, boolean fromPoolThread) {
+		synchronized(this) {
+			assert(_activeTasks.get(jobId) == null);
+			
+			ActiveTaskRecord record = new ActiveTaskRecord(threadId, jobId, fromPoolThread);
+			_activeTasks.put(jobId, record);
+			if(fromPoolThread)
+				_activePoolThreads++;
+			else
+				_activeInplaceThreads++;
+		}
+	}
+	
+	public void unregisterActiveTask(long jobId) {
+		synchronized(this) {
+			ActiveTaskRecord record = _activeTasks.get(jobId);
+			assert(record != null);
+			if(record != null) {
+				if(record.isPoolThread())
+					_activePoolThreads--;
+				else
+					_activeInplaceThreads--;
+				
+				_activeTasks.remove(jobId);
+			}
+		}
+	}
+	
+	public int getActivePoolThreads() {
+		return _activePoolThreads;
+	}
+	
+	public int getActiveInplaceThread() {
+		return _activeInplaceThreads;
+	}
+	
+	private static class ActiveTaskRecord {
+		long _jobId;
+		long _threadId;
+		boolean _fromPoolThread;
+		long _jobLastHeartbeatTick;
+		
+		public ActiveTaskRecord(long jobId, long threadId, boolean fromPoolThread) {
+			_threadId = threadId;
+			_jobId = jobId;
+			_fromPoolThread = fromPoolThread;
+			_jobLastHeartbeatTick = System.currentTimeMillis();
+		}
+		
+		public long getThreadId() {
+			return _threadId;
+		}
+		
+		public long getJobId() {
+			return _jobId;
+		}
+		
+		public boolean isPoolThread() {
+			return _fromPoolThread;
+		}
+		
+		public void updateJobHeartbeatTick() {
+			_jobLastHeartbeatTick = System.currentTimeMillis();
+		}
+		
+		public long millisSinceLastJobHeartbeat() {
+			return System.currentTimeMillis() - _jobLastHeartbeatTick;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9e4ebdd8/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java
----------------------------------------------------------------------
diff --git a/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java b/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java
index 16a9bee..d9bb216 100644
--- a/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java
+++ b/server/src/org/apache/cloudstack/messagebus/SubjectConstants.java
@@ -17,5 +17,9 @@
 package org.apache.cloudstack.messagebus;
 
 public interface SubjectConstants {
+	// VM power state messages on message bus
 	public static final String VM_POWER_STATE = "vm.powerstate";
+	
+	// job messages on message bus 
+	public static final String JOB_HEARTBEAT = "job.heartbeat";
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9e4ebdd8/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java
index e6987ad..2d59161 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -30,6 +30,7 @@ import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import com.cloud.async.AsyncJobManager;
+import com.cloud.async.AsyncJobMonitor;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.ComponentContext;
@@ -42,6 +43,7 @@ public class TestAsyncJobManager extends TestCase {
     @Inject AsyncJobManager asyncMgr;
     @Inject ClusterManager clusterMgr;
     @Inject MessageBus messageBus;
+    @Inject AsyncJobMonitor jobMonitor;
     
     @Before                                                  
     public void setUp() {
@@ -72,7 +74,9 @@ public class TestAsyncJobManager extends TestCase {
 			}
 		});
 		thread.start();
-    	
+    
+		jobMonitor.registerActiveTask(1, 1, false);
+		
     	asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() {
     		public boolean checkCondition() {
     			System.out.println("Check condition to exit");
@@ -80,6 +84,8 @@ public class TestAsyncJobManager extends TestCase {
     		}
     	});
     	
+		jobMonitor.unregisterActiveTask(1);
+    	
     	try {
     		thread.join();
     	} catch(InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9e4ebdd8/server/test/resources/AsyncJobTestContext.xml
----------------------------------------------------------------------
diff --git a/server/test/resources/AsyncJobTestContext.xml b/server/test/resources/AsyncJobTestContext.xml
index 54ce0cd..3674f15 100644
--- a/server/test/resources/AsyncJobTestContext.xml
+++ b/server/test/resources/AsyncJobTestContext.xml
@@ -43,6 +43,10 @@
   <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
     <property name="name" value="ApiAsyncJobDispatcher" />
   </bean>
+  <bean id="asyncJobMonitor" class="com.cloud.async.AsyncJobMonitor">
+    <property name="inactivityWarningThresholdMs" value="5" />
+    <property name="InactivityCheckIntervalMs" value="1" />
+  </bean>
   
   <bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase"
/>
 


Mime
View raw message