incubator-cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: Architecture refactoring - Stateless management server - EventBus
Date Tue, 16 Oct 2012 01:11:09 GMT
Updated Branches:
  refs/heads/javelin 0a7d03c90 -> 24530a274


Architecture refactoring - Stateless management server - EventBus


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/24530a27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/24530a27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/24530a27

Branch: refs/heads/javelin
Commit: 24530a274e44279c337b145e0432f64b67545bfa
Parents: 0a7d03c
Author: Kelven Yang <kelven.yang@citrix.com>
Authored: Mon Oct 15 18:03:09 2012 -0700
Committer: Kelven Yang <kelven.yang@citrix.com>
Committed: Mon Oct 15 18:10:38 2012 -0700

----------------------------------------------------------------------
 utils/src/com/cloud/utils/events/EventBus.java     |   25 ++
 utils/src/com/cloud/utils/events/EventBusBase.java |  292 +++++++++++++++
 utils/src/com/cloud/utils/events/EventsTest.java   |   66 ----
 utils/src/com/cloud/utils/events/PublishScope.java |   23 ++
 utils/src/com/cloud/utils/events/Subscriber.java   |   22 ++
 5 files changed, 362 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/EventBus.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/EventBus.java b/utils/src/com/cloud/utils/events/EventBus.java
new file mode 100644
index 0000000..4195acd
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/EventBus.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+public interface EventBus {
+	void subscribe(String subject, Subscriber subscriber);
+	void unsubscribe(String subject, Subscriber subscriber);
+	
+	void publish(String subject, PublishScope scope, Object sender, String args);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/EventBusBase.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/EventBusBase.java b/utils/src/com/cloud/utils/events/EventBusBase.java
new file mode 100644
index 0000000..0c135db
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/EventBusBase.java
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+import edu.emory.mathcs.backport.java.util.Collections;
+
+public class EventBusBase implements EventBus {
+
+	private Gate _gate;
+	private List<ActionRecord> _pendingActions;
+	
+	private SubscriptionNode _subscriberRoot;
+	
+	public EventBusBase() {
+		_gate = new Gate();
+		_pendingActions = new ArrayList<ActionRecord>();
+		
+		_subscriberRoot = new SubscriptionNode("/", null);
+	}
+	
+	@Override
+	public void subscribe(String subject, Subscriber subscriber) {
+		assert(subject != null);
+		assert(subscriber != null);
+		if(_gate.enter()) {
+			SubscriptionNode current = locate(subject, null, true);
+			assert(current != null);
+			current.addSubscriber(subscriber);
+			_gate.leave();
+		} else {
+			synchronized(_pendingActions) {
+				_pendingActions.add(new ActionRecord(ActionType.Subscribe, subject, subscriber));
+			}
+		}
+	}
+
+	@Override
+	public void unsubscribe(String subject, Subscriber subscriber) {
+		if(_gate.enter()) {
+			SubscriptionNode current = locate(subject, null, false);
+			if(current != null)
+				current.removeSubscriber(subscriber);
+			
+			_gate.leave();
+		} else {
+			synchronized(_pendingActions) {
+				_pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber));
+			}
+		}
+	}
+
+	@Override
+	public void publish(String subject, PublishScope scope, Object sender,
+		String args) {
+		
+		if(_gate.enter(true)) {
+
+			List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
+			SubscriptionNode current = locate(subject, chainFromTop, false);
+			
+			if(current != null)
+				current.notifySubscribers(subject, sender, args);
+			
+			Collections.reverse(chainFromTop);
+			for(SubscriptionNode node : chainFromTop)
+				node.notifySubscribers(subject, sender, args);
+			
+			_gate.leave();
+		}
+	}
+	
+	private void onGateOpen() {
+		synchronized(_pendingActions) {
+			ActionRecord record = null;
+			if(_pendingActions.size() > 0) {
+				while((record = _pendingActions.remove(0)) != null) {
+					switch(record.getType()) {
+					case Subscribe :
+						{
+							SubscriptionNode current = locate(record.getSubject(), null, true);
+							assert(current != null);
+							current.addSubscriber(record.getSubscriber());
+						}
+						break;
+						
+					case Unsubscribe :
+						{
+							SubscriptionNode current = locate(record.getSubject(), null, false);
+							if(current != null)
+								current.removeSubscriber(record.getSubscriber());
+						}
+						break;
+						
+					default :
+						assert(false);
+						break;
+					
+					}
+				}
+			}
+		}
+	}
+	
+	
+	private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop,
+		boolean createPath) {
+		
+		assert(subject != null);
+		
+		String[] subjectPathTokens = subject.split("\\.");
+		return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath);
+	}
+	
+	private static SubscriptionNode locate(String[] subjectPathTokens, 
+		SubscriptionNode current, List<SubscriptionNode> chainFromTop, boolean createPath)
{
+		
+		assert(current != null);
+		assert(subjectPathTokens != null);
+		assert(subjectPathTokens.length > 0);
+
+		if(chainFromTop != null)
+			chainFromTop.add(current);
+		
+		SubscriptionNode next = current.getChild(subjectPathTokens[0]);
+		if(next == null) {
+			if(createPath) {
+				next = new SubscriptionNode(subjectPathTokens[0], null);
+				current.addChild(subjectPathTokens[0], next);
+			} else {
+				return null;
+			}
+		}
+		
+		if(subjectPathTokens.length > 1) {
+			return locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length),
+				next, chainFromTop, createPath);
+		} else {
+			return next;
+		}
+	}
+	
+	
+	//
+	// Support inner classes
+	//
+	private static enum ActionType {
+		Subscribe,
+		Unsubscribe
+	}
+	
+	private static class ActionRecord {
+		private ActionType _type;
+		private String _subject;
+		private Subscriber _subscriber;
+		
+		public ActionRecord(ActionType type, String subject, Subscriber subscriber) {
+			_type = type;
+			_subject = subject;
+			_subscriber = subscriber;
+		}
+		
+		public ActionType getType() { 
+			return _type; 
+		}
+		
+		public String getSubject() {
+			return _subject;
+		}
+		
+		public Subscriber getSubscriber() {
+			return _subscriber;
+		}
+	}
+	
+	private class Gate {
+		private int _reentranceCount;
+		private Thread _gateOwner;
+		
+		public Gate() {
+			_reentranceCount = 0;
+			_gateOwner = null;
+		}
+		
+		public boolean enter() {
+			return enter(false);
+		}
+		
+		public boolean enter(boolean wait) {
+			while(true) {
+				synchronized(this) {
+					if(_reentranceCount == 0) {
+						assert(_gateOwner == null);
+						
+						_reentranceCount++;
+						_gateOwner = Thread.currentThread();
+						return true;
+					} else {
+						if(wait) {
+							try {
+								wait();
+							} catch (InterruptedException e) {
+							}
+						} else {
+							break;
+						}
+					}
+				}
+			}
+			
+			return false;
+		}
+		
+		public void leave() {
+			synchronized(this) {
+				if(_reentranceCount > 0) {
+					assert(_gateOwner == Thread.currentThread());
+					
+					onGateOpen();
+					_reentranceCount--;
+					assert(_reentranceCount == 0);
+					_gateOwner = null;
+					
+					notifyAll();
+				}
+			}
+		}
+	}
+	
+	private static class SubscriptionNode {
+		private String _nodeKey;
+		private List<Subscriber> _subscribers;
+		private Map<String, SubscriptionNode> _children;
+		
+		public SubscriptionNode(String nodeKey, Subscriber subscriber) {
+			assert(nodeKey != null);
+			_nodeKey = nodeKey;
+			_subscribers = new ArrayList<Subscriber>();
+			
+			if(subscriber != null)
+				_subscribers.add(subscriber);
+			
+			_children = new HashMap<String, SubscriptionNode>();
+		}
+		
+		public List<Subscriber> getSubscriber() {
+			return _subscribers;
+		}
+		
+		public void addSubscriber(Subscriber subscriber) {
+			_subscribers.add(subscriber);
+		}
+		
+		public void removeSubscriber(Subscriber subscriber) {
+			_subscribers.remove(subscriber);
+		}
+		
+		public SubscriptionNode getChild(String key) {
+			return _children.get(key);
+		}
+		
+		public void addChild(String key, SubscriptionNode childNode) {
+			_children.put(key, childNode);
+		}
+		
+		public void notifySubscribers(String subject, Object sender, String args) {
+			for(Subscriber subscriber : _subscribers) {
+				subscriber.onPublishEvent(subject, sender, args);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/EventsTest.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/EventsTest.java b/utils/src/com/cloud/utils/events/EventsTest.java
deleted file mode 100644
index abef23e..0000000
--- a/utils/src/com/cloud/utils/events/EventsTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.utils.events;
-
-public class EventsTest {
-	public void onWeatherChange(Object sender, EventArgs args) {
-		System.out.println("onWeatherChange, weather: " + ((WeatherChangeEventArgs)args).getWeather());
-	}
-	
-	public void onTrafficChange(Object sender, EventArgs args) {
-		System.out.println("onTrafficChange");
-	}
-	
-	public void run() {
-		SubscriptionMgr mgr = SubscriptionMgr.getInstance();
-		try {
-			mgr.subscribe("weather", this, "onWeatherChange");
-			mgr.subscribe("traffic", this, "onTrafficChange");
-		} catch (SecurityException e) {
-			e.printStackTrace();
-		} catch (NoSuchMethodException e) {
-			e.printStackTrace();
-		}
-		
-		mgr.notifySubscribers("weather", null, new WeatherChangeEventArgs("weather", "Sunny"));
-		mgr.notifySubscribers("traffic", null, EventArgs.Empty);
-	}
-
-	public static void main(String[] args) {
-		EventsTest test = new EventsTest();
-		test.run();
-	}
-}
-
-class WeatherChangeEventArgs extends EventArgs {
-	private static final long serialVersionUID = -952166331523609047L;
-	
-	private String weather;
-	
-	public WeatherChangeEventArgs() {
-	}
-	
-	public WeatherChangeEventArgs(String subject, String weather) {
-		super(subject);
-		this.weather = weather;
-	}
-	
-	public String getWeather() { return weather; }
-	public void setWeather(String weather) {
-		this.weather = weather;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/PublishScope.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/PublishScope.java b/utils/src/com/cloud/utils/events/PublishScope.java
new file mode 100644
index 0000000..bab852a
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/PublishScope.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+public enum PublishScope {
+	LOCAL,
+	GLOBAL
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/24530a27/utils/src/com/cloud/utils/events/Subscriber.java
----------------------------------------------------------------------
diff --git a/utils/src/com/cloud/utils/events/Subscriber.java b/utils/src/com/cloud/utils/events/Subscriber.java
new file mode 100644
index 0000000..7af283b
--- /dev/null
+++ b/utils/src/com/cloud/utils/events/Subscriber.java
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.utils.events;
+
+public interface Subscriber {
+	void onPublishEvent(String subject, Object sender, String args);
+}


Mime
View raw message