incubator-cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: Add server side transport driver
Date Wed, 12 Dec 2012 19:45:20 GMT
Updated Branches:
  refs/heads/javelin f52950689 -> 11e9baca3


Add server side transport driver


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/11e9baca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/11e9baca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/11e9baca

Branch: refs/heads/javelin
Commit: 11e9baca371a962d397a84fceb20d25007082e04
Parents: f529506
Author: Kelven Yang <kelveny@gmail.com>
Authored: Wed Dec 12 11:44:53 2012 -0800
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Wed Dec 12 11:44:53 2012 -0800

----------------------------------------------------------------------
 .../framework/messaging/TransportEndpointSite.java |   33 ++++++++-
 .../framework/messaging/TransportProvider.java     |    2 +
 .../messaging/client/ClientTransportProvider.java  |    5 ++
 .../messaging/server/ServerTransportProvider.java  |   52 ++++++++++++++-
 4 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
index ca6155b..82ed9f5 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
@@ -24,18 +24,25 @@ import java.util.List;
 import java.util.Map;
 
 public class TransportEndpointSite {
+	private TransportProvider _provider;
 	private TransportEndpoint _endpoint;
 	private TransportAddress _address;
 	
 	private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
 	private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String,
TransportMultiplexier>();  
 	
-	public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
+	private int _outstandingSignalRequests;
+	
+	public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, TransportAddress
address) {
+		assert(provider != null);
 		assert(endpoint != null);
 		assert(address != null);
 		
+		_provider = provider;
 		_endpoint = endpoint;
 		_address = address;
+		
+		_outstandingSignalRequests = 0;
 	}
 	
 	public TransportEndpoint getEndpoint() {
@@ -68,7 +75,7 @@ public class TransportEndpointSite {
 			_outputQueue.add(pdu);
 		}
 		
-		processOutput();
+		signalOutputProcessRequest();
 	}
 	
 	public TransportPdu getNextOutputPdu() {
@@ -80,7 +87,7 @@ public class TransportEndpointSite {
 		return null;
 	}
 	
-	private void processOutput() {
+	public void processOutput() {
 		TransportPdu pdu;
 		TransportEndpoint endpoint = getEndpoint();
 
@@ -104,4 +111,24 @@ public class TransportEndpointSite {
 		
 		return multiplexier;
 	}
+	
+	private void signalOutputProcessRequest() {
+		boolean proceed = false;
+		synchronized(this) {
+			if(_outstandingSignalRequests == 0) {
+				_outstandingSignalRequests++;
+				proceed = true;
+			}
+		}
+		
+		if(proceed)
+			_provider.requestSiteOutput(this);
+	}
+	
+	public void ackOutputProcessSignal() {
+		synchronized(this) {
+			assert(_outstandingSignalRequests == 1);
+			_outstandingSignalRequests--;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
index bdbdd17..e25407f 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
@@ -22,6 +22,8 @@ public interface TransportProvider {
 	TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
 	boolean detach(TransportEndpoint endpoint);
 	
+	void requestSiteOutput(TransportEndpointSite site);
+	
 	void sendMessage(String soureEndpointAddress, String targetEndpointAddress, 
 		String multiplexier, String message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
index 60c07c3..c2bbef7 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
@@ -38,6 +38,11 @@ public class ClientTransportProvider implements TransportProvider {
 	}
 	
 	@Override
+	public void requestSiteOutput(TransportEndpointSite site) {
+		// ???
+	}
+	
+	@Override
 	public void sendMessage(String soureEndpointAddress, String targetEndpointAddress, 
 		String multiplexier, String message) {
 		// TODO

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
index 3372b75..014c8fe 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
@@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.messaging.server;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.cloudstack.framework.messaging.TransportAddress;
 import org.apache.cloudstack.framework.messaging.TransportDataPdu;
@@ -28,20 +30,48 @@ import org.apache.cloudstack.framework.messaging.TransportEndpoint;
 import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
 import org.apache.cloudstack.framework.messaging.TransportPdu;
 import org.apache.cloudstack.framework.messaging.TransportProvider;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
 
 public class ServerTransportProvider implements TransportProvider {
+    private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
+	
+	public static final int DEFAULT_WORKER_POOL_SIZE = 5;
+	
 	private String _nodeId;
 
 	private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
+	private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
+	private ExecutorService _executor;
 	
 	private int _nextEndpointId = new Random().nextInt();
 	
 	public ServerTransportProvider() {
 	}
 	
-	public String getNodeId() { return _nodeId; }
-	public void setNodeId(String nodeId) {
+	public String getNodeId() { 
+		return _nodeId; 
+	}
+	
+	public ServerTransportProvider setNodeId(String nodeId) {
 		_nodeId = nodeId;
+		return this;
+	}
+	
+	public int getWorkerPoolSize() {
+		return _poolSize; 
+	}
+	
+	public ServerTransportProvider setWorkerPoolSize(int poolSize) {
+		assert(poolSize > 0);
+		
+		_poolSize = poolSize;
+		return this;
+	}
+	
+	public void initialize() {
+		_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
 	}
 	
 	@Override
@@ -64,7 +94,7 @@ public class ServerTransportProvider implements TransportProvider {
 				// already attached
 				return endpointSite;
 			}
-			endpointSite = new TransportEndpointSite(endpoint, transportAddress);
+			endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
 			_endpointMap.put(endpointId, endpointSite);
 		}
 		
@@ -87,6 +117,22 @@ public class ServerTransportProvider implements TransportProvider {
 	}
 	
 	@Override
+	public void requestSiteOutput(final TransportEndpointSite site) {
+		_executor.execute(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					site.processOutput();
+					site.ackOutputProcessSignal();
+				} catch(Throwable e) {
+					s_logger.error("Unhandled exception", e);
+				}
+			}
+		});
+	}
+	
+	@Override
 	public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, 
 		String multiplexier, String message) {
 		


Mime
View raw message