cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yas...@apache.org
Subject [29/52] [abbrv] git commit: updated refs/heads/pvlan to 3c3d677
Date Thu, 02 May 2013 21:56:04 GMT
add unit test to message bus in master branch


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/d44e25ef
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/d44e25ef
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/d44e25ef

Branch: refs/heads/pvlan
Commit: d44e25efba72b03b114b5662c60d94ddcc3264f8
Parents: 85e73d1
Author: Kelven Yang <kelveny@gmail.com>
Authored: Mon Apr 29 14:35:41 2013 -0700
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Mon Apr 29 14:36:03 2013 -0700

----------------------------------------------------------------------
 .../framework/messagebus/MessageBus.java           |    2 +
 .../framework/messagebus/MessageBusBase.java       |  130 +++++++++++++--
 .../cloudstack/messagebus/TestMessageBus.java      |  116 +++++++++++++
 .../ipc/test/resources/MessageBusTestContext.xml   |   51 ++++++
 4 files changed, 284 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
index 4aa007d..a15dd44 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
@@ -27,6 +27,8 @@ public interface MessageBus {
 	
 	void subscribe(String subject, MessageSubscriber subscriber);
 	void unsubscribe(String subject, MessageSubscriber subscriber);
+	void clearAll();
+	void prune();
 	
 	void publish(String senderAddress, String subject, PublishScope scope, Object args);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index 5b7af4d..9cf5e77 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -40,7 +40,7 @@ public class MessageBusBase implements MessageBus {
 		_gate = new Gate();
 		_pendingActions = new ArrayList<ActionRecord>();
 		
-		_subscriberRoot = new SubscriptionNode("/", null);
+		_subscriberRoot = new SubscriptionNode(null, "/", null);
 	}
 	
 	@Override
@@ -72,10 +72,13 @@ public class MessageBusBase implements MessageBus {
 	@Override
 	public void unsubscribe(String subject, MessageSubscriber subscriber) {
 		if(_gate.enter()) {
-			SubscriptionNode current = locate(subject, null, false);
-			if(current != null)
-				current.removeSubscriber(subscriber);
-			
+			if(subject != null) {
+				SubscriptionNode current = locate(subject, null, false);
+				if(current != null)
+					current.removeSubscriber(subscriber, false);
+			} else {
+				this._subscriberRoot.removeSubscriber(subscriber, true);
+			}
 			_gate.leave();
 		} else {
 			synchronized(_pendingActions) {
@@ -83,7 +86,48 @@ public class MessageBusBase implements MessageBus {
 			}
 		}
 	}
-
+	
+	@Override
+	public void clearAll() {
+		if(_gate.enter()) {
+			_subscriberRoot.clearAll();
+			doPrune();
+			_gate.leave();
+		} else {
+			synchronized(_pendingActions) {
+				_pendingActions.add(new ActionRecord(ActionType.ClearAll, null, null));
+			}
+		}
+	}
+		
+	@Override
+	public void prune() {
+		if(_gate.enter()) {
+			doPrune();
+			_gate.leave();
+		} else {
+			synchronized(_pendingActions) {
+				_pendingActions.add(new ActionRecord(ActionType.Prune, null, null));
+			}
+		}
+	}
+	
+	private void doPrune() {
+		List<SubscriptionNode> trimNodes = new ArrayList<SubscriptionNode>();
+		_subscriberRoot.prune(trimNodes);
+		
+		while(trimNodes.size() > 0) {
+			SubscriptionNode node = trimNodes.remove(0);
+			SubscriptionNode parent = node.getParent();
+			if(parent != null) {
+				parent.removeChild(node.getNodeKey());
+				if(parent.isTrimmable()) {
+					trimNodes.add(parent);
+				}
+			}
+		}
+	}
+	
 	@Override
 	public void publish(String senderAddress, String subject, PublishScope scope, 
 		Object args) {
@@ -119,12 +163,22 @@ public class MessageBusBase implements MessageBus {
 						break;
 						
 					case Unsubscribe :
-						{
+						if(record.getSubject() != null) {
 							SubscriptionNode current = locate(record.getSubject(), null, false);
 							if(current != null)
-								current.removeSubscriber(record.getSubscriber());
+								current.removeSubscriber(record.getSubscriber(), false);
+						} else {
+							this._subscriberRoot.removeSubscriber(record.getSubscriber(), true);
 						}
 						break;
+					
+					case ClearAll :
+						_subscriberRoot.clearAll();
+						break;
+						
+					case Prune :
+						doPrune();
+						break;
 						
 					default :
 						assert(false);
@@ -136,11 +190,13 @@ public class MessageBusBase implements MessageBus {
 		}
 	}
 	
-	
 	private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop,
 		boolean createPath) {
 		
 		assert(subject != null);
+		// "/" is special name for root node
+		if(subject.equals("/"))
+			return _subscriberRoot;
 		
 		String[] subjectPathTokens = subject.split("\\.");
 		return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath);
@@ -159,7 +215,7 @@ public class MessageBusBase implements MessageBus {
 		SubscriptionNode next = current.getChild(subjectPathTokens[0]);
 		if(next == null) {
 			if(createPath) {
-				next = new SubscriptionNode(subjectPathTokens[0], null);
+				next = new SubscriptionNode(current, subjectPathTokens[0], null);
 				current.addChild(subjectPathTokens[0], next);
 			} else {
 				return null;
@@ -180,7 +236,9 @@ public class MessageBusBase implements MessageBus {
 	//
 	private static enum ActionType {
 		Subscribe,
-		Unsubscribe
+		Unsubscribe,
+		ClearAll,
+		Prune
 	}
 	
 	private static class ActionRecord {
@@ -262,13 +320,14 @@ public class MessageBusBase implements MessageBus {
 	}
 	
 	private static class SubscriptionNode {
-		@SuppressWarnings("unused")
 		private String _nodeKey;
 		private List<MessageSubscriber> _subscribers;
 		private Map<String, SubscriptionNode> _children;
+		private SubscriptionNode _parent;
 		
-		public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) {
+		public SubscriptionNode(SubscriptionNode parent, String nodeKey, MessageSubscriber subscriber)
{
 			assert(nodeKey != null);
+			_parent = parent;
 			_nodeKey = nodeKey;
 			_subscribers = new ArrayList<MessageSubscriber>();
 			
@@ -278,16 +337,30 @@ public class MessageBusBase implements MessageBus {
 			_children = new HashMap<String, SubscriptionNode>();
 		}
 		
+		public SubscriptionNode getParent() {
+			return _parent;			
+		}
+		
+		public String getNodeKey() {
+			return _nodeKey;
+		}
+		
 		@SuppressWarnings("unused")
 		public List<MessageSubscriber> getSubscriber() {
 			return _subscribers;
 		}
 		
 		public void addSubscriber(MessageSubscriber subscriber) {
-			_subscribers.add(subscriber);
+			if(!_subscribers.contains(subscriber))
+				_subscribers.add(subscriber);
 		}
 		
-		public void removeSubscriber(MessageSubscriber subscriber) {
+		public void removeSubscriber(MessageSubscriber subscriber, boolean recursively) {
+			if(recursively) {
+				for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) {
+					entry.getValue().removeSubscriber(subscriber, true);
+				}
+			}
 			_subscribers.remove(subscriber);
 		}
 		
@@ -299,10 +372,37 @@ public class MessageBusBase implements MessageBus {
 			_children.put(key, childNode);
 		}
 		
+		public void removeChild(String key) {
+			_children.remove(key);
+		}
+		
+		public void clearAll() {
+			// depth-first
+			for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) {
+				entry.getValue().clearAll();
+			}
+			_subscribers.clear();
+		}
+		
+		public void prune(List<SubscriptionNode> trimNodes) {
+			assert(trimNodes != null);
+			
+			for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) {
+				entry.getValue().prune(trimNodes);
+			}
+			
+			if(isTrimmable())
+				trimNodes.add(this);
+		}
+		
 		public void notifySubscribers(String senderAddress, String subject,  Object args) {
 			for(MessageSubscriber subscriber : _subscribers) {
 				subscriber.onPublishMessage(senderAddress, subject, args);
 			}
 		}
+		
+		public boolean isTrimmable() {
+			return _children.size() == 0 && _subscribers.size() == 0;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
new file mode 100644
index 0000000..dabfdd3
--- /dev/null
+++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.messagebus;
+
+import javax.inject.Inject;
+
+import junit.framework.TestCase;
+
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
+import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations="classpath:/MessageBusTestContext.xml")
+public class TestMessageBus extends TestCase {
+	
+	@Inject MessageBus _messageBus;
+
+	@Test
+	public void testExactSubjectMatch() {
+		_messageBus.subscribe("Host", new MessageSubscriber() {
+
+			@Override
+			public void onPublishMessage(String senderAddress, String subject, Object args) {
+				Assert.assertEquals(subject, "Host");
+			}
+		});
+		
+		_messageBus.publish(null, "Host", PublishScope.LOCAL, null);
+		_messageBus.publish(null, "VM", PublishScope.LOCAL, null);
+		_messageBus.clearAll();
+	}
+
+	@Test
+	public void testRootSubjectMatch() {
+		_messageBus.subscribe("/", new MessageSubscriber() {
+
+			@Override
+			public void onPublishMessage(String senderAddress, String subject, Object args) {
+				Assert.assertTrue(subject.equals("Host") || subject.equals("VM"));
+			}
+		});
+		
+		_messageBus.publish(null, "Host", PublishScope.LOCAL, null);
+		_messageBus.publish(null, "VM", PublishScope.LOCAL, null);
+		_messageBus.clearAll();
+	}
+	
+	@Test
+	public void testMiscMatch() {
+		MessageSubscriber subscriberAtParentLevel = new MessageSubscriber() {
+			@Override
+			public void onPublishMessage(String senderAddress, String subject, Object args) {
+				Assert.assertTrue(subject.startsWith(("Host")) || subject.startsWith("VM"));
+			}
+		};
+		
+		MessageSubscriber subscriberAtChildLevel = new MessageSubscriber() {
+			@Override
+			public void onPublishMessage(String senderAddress, String subject, Object args) {
+				Assert.assertTrue(subject.equals("Host.123"));
+			}
+		};
+		
+		subscriberAtParentLevel = Mockito.spy(subscriberAtParentLevel);
+		subscriberAtChildLevel = Mockito.spy(subscriberAtChildLevel);
+		
+		_messageBus.subscribe("Host", subscriberAtParentLevel);
+		_messageBus.subscribe("VM", subscriberAtParentLevel);
+		_messageBus.subscribe("Host.123", subscriberAtChildLevel);
+		
+		_messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
+		_messageBus.publish(null, "Host.321", PublishScope.LOCAL, null);
+		_messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
+		
+		Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.123", null);
+		Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.321", null);
+		Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "VM.123", null);
+		Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
+
+		Mockito.reset(subscriberAtParentLevel);
+		Mockito.reset(subscriberAtChildLevel);
+		
+		_messageBus.unsubscribe(null, subscriberAtParentLevel);
+		_messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
+		_messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
+	
+		Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
+		Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "Host.123",
null);
+		Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "VM.123",
null);
+		
+		_messageBus.clearAll();
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/test/resources/MessageBusTestContext.xml
----------------------------------------------------------------------
diff --git a/framework/ipc/test/resources/MessageBusTestContext.xml b/framework/ipc/test/resources/MessageBusTestContext.xml
new file mode 100644
index 0000000..fcfcb08
--- /dev/null
+++ b/framework/ipc/test/resources/MessageBusTestContext.xml
@@ -0,0 +1,51 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+  xmlns:context="http://www.springframework.org/schema/context"
+  xmlns:tx="http://www.springframework.org/schema/tx" 
+  xmlns:aop="http://www.springframework.org/schema/aop"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans
+                      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+                      http://www.springframework.org/schema/tx 
+                      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
+                      http://www.springframework.org/schema/aop
+                      http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
+                      http://www.springframework.org/schema/context
+                      http://www.springframework.org/schema/context/spring-context-3.0.xsd">
                    
+  <context:annotation-config />
+
+  <bean id="onwireRegistry" class="org.apache.cloudstack.framework.serializer.OnwireClassRegistry"
+    init-method="scan" >
+    <property name="packages">
+      <list>
+        <value>org.apache.cloudstack.framework</value>
+      </list>
+    </property>
+  </bean>
+  
+  <bean id="messageSerializer" class="org.apache.cloudstack.framework.serializer.JsonMessageSerializer">
+    <property name="onwireClassRegistry" ref="onwireRegistry" />
+  </bean>
+
+  <bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase">
+    <property name="messageSerializer" ref="messageSerializer" />
+  </bean>
+  
+</beans>


Mime
View raw message