activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r685612 [3/3] - in /activemq/sandbox/groupmq: ./ eclipse-classes/ eclipse-classes/META-INF/ eclipse-classes/org/ eclipse-classes/org/apache/ eclipse-classes/org/apache/group/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/or...
Date Wed, 13 Aug 2008 17:05:11 GMT
Added: activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java (added)
+++ activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java Wed Aug
13 10:05:07 2008
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.groupmq.Group;
+import org.apache.groupmq.Member;
+
+
+public class GroupMemberTest extends TestCase {
+    protected BrokerService broker;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+        
+    public void testCoordinatorSelection() throws Exception{
+        Group group = new Group(null,"");
+        List<Member>list = new ArrayList<Member>();
+        final int number =10;
+        Member choosen = null;
+        for (int i =0;i< number;i++) {
+            Member m = new Member("group"+i);
+            m.setId(""+i);
+            if (number/2==i) {
+                m.setCoordinatorWeight(10);
+                choosen=m;
+            }
+            list.add(m);
+        }
+        Member c = group.selectCordinator(list);
+        assertEquals(c,choosen);
+    }
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * @throws Exception 
+     */
+    public void testGroup() throws Exception {
+        
+        final int number = 10;
+        List<Group>groupMaps = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i =0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group map = new Group(connection,"map"+i);
+            map.setHeartBeatInterval(200);
+            map.setMinimumGroupSize(i+1);
+            map.start();
+            groupMaps.add(map);
+        }
+        
+        int coordinatorNumber = 0;
+        for (Group map:groupMaps) {
+            if (map.isCoordinator()) {
+                coordinatorNumber++;
+            }
+        }
+        for(Group map:groupMaps) {
+            map.stop();
+        }
+        
+    }
+    
+public void testWeightedGroup() throws Exception {
+        
+        final int number = 10;
+        List<Group>groupMaps = new ArrayList<Group>();
+        Group last = null;
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i =0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group map = new Group(connection,"map"+i);
+            if(i ==number/2) {
+                map.setCoordinatorWeight(10);
+                last=map;
+            }
+            
+            map.setMinimumGroupSize(i+1);
+            map.start();
+            groupMaps.add(map);
+        }
+        Thread.sleep(2000);
+        int coordinator = 0;
+        Group groupCoordinator = null;
+        for (Group map:groupMaps) {
+            if (map.isCoordinator()) {
+                coordinator++;
+                groupCoordinator=map;
+            }
+        }
+             
+        
+        assertNotNull(groupCoordinator);
+        assertEquals(1,coordinator);
+        assertEquals(last.getName(),groupCoordinator.getName());
+        
+        for(Group map:groupMaps) {
+            map.stop();
+        }
+    }
+
+    
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}
+

Propchange: activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java (added)
+++ activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java Wed Aug
13 10:05:07 2008
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.groupmq.Group;
+import org.apache.groupmq.GroupMessageListener;
+import org.apache.groupmq.Member;
+
+public class GroupMessageTest extends TestCase {
+    protected BrokerService broker;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    public void testGroupBroadcast() throws Exception {
+        final int number = 10;
+        final AtomicInteger count = new AtomicInteger();
+        List<Group> groups = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i = 0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group group = new Group(connection, "group" + i);            
+            group.setMinimumGroupSize(i+1);
+            group.start();
+            groups.add(group);
+            group.addGroupMessageListener(new GroupMessageListener() {
+                public void messageDelivered(Member sender, String replyId,
+                        Object message) {
+                    synchronized (count) {
+                        if (count.incrementAndGet() == number) {
+                            count.notifyAll();
+                        }
+                    }
+                }
+            });
+        }
+        groups.get(0).broadcastMessage("hello");
+        synchronized (count) {
+            if (count.get() < number) {
+                count.wait(5000);
+            }
+        }
+        assertEquals(number, count.get());
+        for (Group map : groups) {
+            map.stop();
+        }
+    }
+
+    public void testsendMessage() throws Exception {
+        final int number = 10;
+        final AtomicInteger count = new AtomicInteger();
+        List<Group> groups = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i = 0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            Group group = new Group(connection, "group" + i);
+            group.setMinimumGroupSize(i+1);
+            group.start();
+            groups.add(group);
+            group.addGroupMessageListener(new GroupMessageListener() {
+                public void messageDelivered(Member sender, String replyId,
+                        Object message) {
+                    synchronized (count) {
+                        count.incrementAndGet();
+                        count.notifyAll();
+                    }
+                }
+            });
+        }
+        groups.get(0).sendMessage("hello");
+        synchronized (count) {
+            if (count.get() == 0) {
+                count.wait(5000);
+            }
+        }
+        // wait a while to check that only one got it
+        Thread.sleep(2000);
+        assertEquals(1, count.get());
+        for (Group map : groups) {
+            map.stop();
+        }
+    }
+
+    public void testSendToSingleMember() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection1 = factory.createConnection();
+        Connection connection2 = factory.createConnection();
+        Group group1 = new Group(connection1, "group1");
+        final AtomicBoolean called = new AtomicBoolean();
+        group1.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        group1.start();
+        Group group2 = new Group(connection2, "group2");
+        group2.setMinimumGroupSize(2);
+        group2.start();
+        Member member1 = group2.getMemberByName("group1");
+        group2.sendMessage(member1, "hello");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        group1.stop();
+        group2.stop();
+    }
+
+    public void testSendRequestReply() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection1 = factory.createConnection();
+        Connection connection2 = factory.createConnection();
+        final int number = 1000;
+        final AtomicInteger requestCount = new AtomicInteger();
+        final AtomicInteger replyCount = new AtomicInteger();
+        final List<String> requests = new ArrayList<String>();
+        final List<String> replies = new ArrayList<String>();
+        for (int i = 0; i < number; i++) {
+            requests.add("request" + i);
+            replies.add("reply" + i);
+        }
+        final Group group1 = new Group(connection1, "group1");
+        final AtomicBoolean finished = new AtomicBoolean();
+        group1.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                if (!replies.isEmpty()) {
+                    String reply = replies.remove(0);
+                    try {
+                        group1.sendMessageResponse(sender, replyId, reply);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                } 
+            }
+        });
+        group1.start();
+        final Group group2 = new Group(connection2, "group2");
+        group2.setMinimumGroupSize(2);
+        group2.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                if (!requests.isEmpty()) {
+                    String request = requests.remove(0);
+                    try {
+                        group2.sendMessage(sender, request);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }else {
+                    synchronized (finished) {
+                        finished.set(true);
+                        finished.notifyAll();
+                    }
+                }
+            }
+        });
+        group2.start();
+        Member member1 = group2.getMemberByName("group1");
+        group2.sendMessage(member1, requests.remove(0));
+        synchronized (finished) {
+            if (!finished.get()) {
+                finished.wait(10000);
+            }
+        }
+        assertTrue(finished.get());
+        group1.stop();
+        group2.stop();
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java (added)
+++ activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java Wed Aug
13 10:05:07 2008
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.groupmq.DefaultMapChangedListener;
+import org.apache.groupmq.Group;
+import org.apache.groupmq.GroupUpdateException;
+import org.apache.groupmq.Member;
+import org.apache.groupmq.MemberChangedListener;
+
+
+public class GroupStateTest extends TestCase {
+    protected BrokerService broker;
+    protected Connection connection1;
+    protected Connection connection2;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * @throws Exception 
+     */
+    public void testAddMemberChangedListener() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        Group map1 = new Group(connection1,"map1");
+        map1.addMemberChangedListener(new MemberChangedListener(){
+
+            public void memberStarted(Member member) {
+                synchronized(counter) {
+                    counter.incrementAndGet();
+                    counter.notifyAll();
+                }
+                
+            }
+
+            public void memberStopped(Member member) {
+                synchronized(counter) {
+                    counter.decrementAndGet();
+                    counter.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        synchronized(counter) {
+            if (counter.get()<1) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(1, counter.get());
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        synchronized(counter) {
+            if (counter.get()<2) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(2, counter.get());
+        map2.stop();
+        synchronized(counter) {
+            if (counter.get()>=2) {
+                counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3);
+            }
+        }
+        assertEquals(1, counter.get());
+        map1.stop();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
+     * @throws Exception 
+     */
+    public void testAddMapChangedListener() throws Exception {
+        final AtomicBoolean called1 = new AtomicBoolean();
+        final AtomicBoolean called2 = new AtomicBoolean();
+        
+        Group map1 = new Group(connection1,"map1");
+        
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called1) {
+                    called1.set(true);
+                    called1.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        
+        Group map2 = new Group(connection2,"map2");
+        
+        map2.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called2) {
+                    called2.set(true);
+                    called2.notifyAll();
+                }
+            }
+        });
+        map2.start();
+        
+        
+        map1.put("test", "blob");
+        synchronized(called1) {
+            if (!called1.get()) {
+               called1.wait(5000); 
+            }
+        }
+        synchronized(called2) {
+            if (!called2.get()) {
+               called2.wait(5000); 
+            }
+        }
+        assertTrue(called1.get());
+        assertTrue(called2.get());
+        map1.stop();
+        map2.stop();
+    }
+   
+    public void testGetImplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testExpireImplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setLockTimeToLive(1000);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        Thread.sleep(2000);
+        map1.put("test", "bah");
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void XtestExpireImplicitLockOnExit() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map2.stop();
+        map1.put("test", "bah");
+        map1.stop();
+        
+    }
+    
+    public void testGetExplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        map1.setAlwaysLock(true);
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        map2.lock("test");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map2.unlock("test");
+        map1.lock("test");
+        try {
+            map2.lock("test");
+            fail("Should have thrown an exception!");
+        } catch (GroupUpdateException e) {
+        }
+        map1.stop();
+        map2.stop();
+    }
+    
+    
+
+    /**
+     * Test method for {@link org.apache.activemq.group.Group#clear()}.
+     * 
+     * @throws Exception
+     */
+    public void testClear() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+            public void mapRemove(Member owner, Object key, Object value,boolean expired)
{        
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.isEmpty()==false);
+        map2.clear();
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(map1.isEmpty());
+        map1.stop();
+        map2.stop();
+    }
+
+    /**
+     * Test a new map is populated for existing values
+     */
+    public void testMapUpdatedOnStart() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        
+        map1.start();
+        map1.put("test", "foo");
+        Group map2 = new Group(connection2,"map2");
+        map2.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map2.start();
+       
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map2.containsKey("test"));
+        assertTrue(map2.containsValue("foo"));
+        map1.stop();
+        map2.stop();
+    }
+   
+    public void testContainsKey() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.containsKey("test"));
+        map1.stop();
+        map2.stop();
+    }
+
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#containsValue(java.lang.Object)}.
+     * @throws Exception 
+     */
+    public void testContainsValue() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.containsValue("foo"));
+        map1.stop();
+        map2.stop();
+    }
+
+    /**
+     * Test method for {@link org.apache.activemq.group.GroupMap#entrySet()}.
+     * @throws Exception 
+     */
+    
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#get(java.lang.Object)}.
+     * @throws Exception 
+     */
+    public void testGet() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        assertTrue(map1.get("test").equals("foo"));
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testPut() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        Object value = map1.put("foo", "blob");
+        assertNull(value);
+        value = map1.put("foo", "blah");
+        assertEquals(value, "blob");
+        map1.stop();
+        map2.stop();
+    }
+
+    
+    
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.Group#remove(java.lang.Object)}.
+     */
+    public void testRemove() throws Exception{
+        Group map1 = new Group(connection1,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapInsert(Member owner,Object Key, Object Value) {
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+            public void mapRemove(Member owner, Object key, Object value,boolean expired)
{        
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.isEmpty()==false);
+        map2.remove("test");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(map1.isEmpty());
+        
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testExpire() throws Exception{
+        final AtomicBoolean called1 = new AtomicBoolean();
+        final AtomicBoolean called2 = new AtomicBoolean();
+        
+        Group map1 = new Group(connection1,"map1");
+        map1.setTimeToLive(1000);
+        map1.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapRemove(Member owner, Object key, Object value,boolean expired)
{        
+                synchronized(called1) {
+                    called1.set(expired);
+                    called1.notifyAll();
+                }
+            }
+        });
+        map1.start();
+        
+        Group map2 = new Group(connection2,"map2");
+        
+        map2.addMapChangedListener(new DefaultMapChangedListener() {
+            public void mapRemove(Member owner, Object key, Object value,boolean expired)
{        
+                synchronized(called2) {
+                    called2.set(expired);
+                    called2.notifyAll();
+                }
+            }
+        });
+        map2.start();
+        
+        
+        map1.put("test", "blob");
+        synchronized(called1) {
+            if (!called1.get()) {
+               called1.wait(5000); 
+            }
+        }
+        synchronized(called2) {
+            if (!called2.get()) {
+               called2.wait(5000); 
+            }
+        }
+        assertTrue(called1.get());
+        assertTrue(called2.get());
+        map1.stop();
+        map2.stop();
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        ConnectionFactory factory = createConnectionFactory();
+        connection1 = factory.createConnection();
+        connection1.start();
+        connection2 = factory.createConnection();
+        connection2.start();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection1.close();
+        connection2.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message