activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Implementing AMQ-4788 - Adding tests for the ZooKeeper variant of the partition broker plugin.
Date Tue, 08 Oct 2013 15:09:42 GMT
Updated Branches:
  refs/heads/trunk ef64b057a -> 25f70ad48


Implementing AMQ-4788 - Adding tests for the ZooKeeper variant of the partition broker plugin.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25f70ad4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25f70ad4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25f70ad4

Branch: refs/heads/trunk
Commit: 25f70ad483ec8211b9269bf8a33c7bb2d750866e
Parents: ef64b05
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Tue Oct 8 11:09:34 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Tue Oct 8 11:09:39 2013 -0400

----------------------------------------------------------------------
 .../activemq/partition/PartitionBroker.java     | 23 +++--
 .../partition/PartitionBrokerPlugin.java        |  6 ++
 .../partition/ZooKeeperPartitionBroker.java     | 13 +++
 .../activemq/partition/PartitionBrokerTest.java | 60 +++++++++---
 .../partition/ZooKeeperPartitionBrokerTest.java | 99 ++++++++++++++++++++
 5 files changed, 182 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/25f70ad4/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
index 5190207..6ae7990 100644
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
@@ -130,6 +130,7 @@ public class PartitionBroker extends BrokerFilter {
         String connectionString = getConnectionString(targetDTO.ids);
         if( connectionString==null ) {
             LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
+            return;
         }
 
         LOG.info("Redirecting connection to: " + connectionString);
@@ -141,11 +142,9 @@ public class PartitionBroker extends BrokerFilter {
     }
 
     protected String getConnectionString(HashSet<String> ids) {
-        if( getConfig().brokers==null || getConfig().brokers.isEmpty() )
-            return null;
         StringBuilder rc = new StringBuilder();
         for (String id : ids) {
-            String url = getConfig().brokers.get(id);
+            String url = plugin.getBrokerURL(this, id);
             if( url!=null ) {
                 if( rc.length()!=0 ) {
                     rc.append(',');
@@ -153,6 +152,8 @@ public class PartitionBroker extends BrokerFilter {
                 rc.append(url);
             }
         }
+        if( rc.length()==0 )
+            return null;
         return rc.toString();
     }
 
@@ -270,16 +271,22 @@ public class PartitionBroker extends BrokerFilter {
 
     @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{
-        ConnectionMonitor monitor = new ConnectionMonitor(context);
-        monitors.put(info.getConnectionId(), monitor);
-        super.addConnection(context, info);
-        checkTarget(monitor);
+        if( info.isFaultTolerant() ) {
+            ConnectionMonitor monitor = new ConnectionMonitor(context);
+            monitors.put(info.getConnectionId(), monitor);
+            super.addConnection(context, info);
+            checkTarget(monitor);
+        } else {
+            super.addConnection(context, info);
+        }
     }
 
     @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
error) throws Exception {
         super.removeConnection(context, info, error);
-        ConnectionMonitor removed = monitors.remove(info.getConnectionId());
+        if( info.isFaultTolerant() ) {
+            monitors.remove(info.getConnectionId());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/25f70ad4/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
index 815687f..f5b4342 100644
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
@@ -58,4 +58,10 @@ public class PartitionBrokerPlugin implements BrokerPlugin {
         this.config = Partitioning.MAPPER.readValue(config, Partitioning.class);
     }
 
+    public String getBrokerURL(PartitionBroker partitionBroker, String id) {
+        if( config!=null && config.brokers!=null ) {
+            return config.brokers.get(id);
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25f70ad4/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
index 2c18f2d..b37cdbe 100644
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
@@ -26,6 +26,9 @@ import org.linkedin.util.clock.Timespan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  */
 public class ZooKeeperPartitionBroker extends PartitionBroker {
@@ -34,11 +37,20 @@ public class ZooKeeperPartitionBroker extends PartitionBroker {
 
     protected volatile ZKClient zk_client = null;
     protected volatile Partitioning config;
+    protected final CountDownLatch configAcquired = new CountDownLatch(1);
 
     public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin)
{
         super(broker, plugin);
     }
 
+    @Override
+    public void start() throws Exception {
+        super.start();
+        // Lets block a bit until we get our config.. Otherwise just keep
+        // on going.. not a big deal if we get our config later.  Perhaps
+        // ZK service is not having a good day.
+        configAcquired.await(5, TimeUnit.SECONDS);
+    }
 
     @Override
     protected void onMonitorStop() {
@@ -96,6 +108,7 @@ public class ZooKeeperPartitionBroker extends PartitionBroker {
                     monitorWakeup();
                 }
             }, stat);
+            configAcquired.countDown();
             reloadConfigOnPoll = false;
         } catch (Exception e) {
             LOG.warn("Could load partitioning configuration: " + e, e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/25f70ad4/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
index efb5e66..b75c439 100644
--- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
+++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
@@ -23,6 +23,9 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.partition.dto.Partitioning;
 import org.apache.activemq.partition.dto.Target;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import javax.jms.*;
 import java.io.IOException;
@@ -31,23 +34,51 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.*;
+
 /**
  * Unit tests for the PartitionBroker plugin.
  */
-public class PartitionBrokerTest extends AutoFailTestSupport {
+public class PartitionBrokerTest {
 
     protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
     protected ArrayList<Connection> connections = new ArrayList<Connection>();
     Partitioning partitioning;
 
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
+    @Before
+    public void setUp() throws Exception {
         partitioning = new Partitioning();
         partitioning.brokers = new HashMap<String, String>();
     }
 
+    /**
+     * Partitioning can only re-direct failover clients since those
+     * can re-connect and re-establish their state with another broker.
+     */
+    @Test(timeout = 1000*60*60)
+    public void testNonFailoverClientHasNoPartitionEffect() throws Exception {
+
+        partitioning.byClientId = new HashMap<String, Target>();
+        partitioning.byClientId.put("client1", new Target("broker1"));
+        createBrokerCluster(2);
 
+        Connection connection = createConnectionToUrl(getConnectURL("broker2"));
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(0, getTransportConnector("broker1").getConnections().size());
+                assertEquals(1, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+
+        connection.setClientID("client1");
+        connection.start();
+
+        Thread.sleep(1000);
+        assertEquals(0, getTransportConnector("broker1").getConnections().size());
+        assertEquals(1, getTransportConnector("broker2").getConnections().size());
+    }
+
+    @Test(timeout = 1000*60*60)
     public void testPartitionByClientId() throws Exception {
         partitioning.byClientId = new HashMap<String, Target>();
         partitioning.byClientId.put("client1", new Target("broker1"));
@@ -73,6 +104,7 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
         });
     }
 
+    @Test(timeout = 1000*60*60)
     public void testPartitionByQueue() throws Exception {
         partitioning.byQueue = new HashMap<String, Target>();
         partitioning.byQueue.put("foo", new Target("broker1"));
@@ -149,7 +181,10 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
     }
 
     protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException,
JMSException {
-        String url = "failover://(" + getConnectURL(brokerId) + ")";
+        return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")");
+    }
+
+    private Connection createConnectionToUrl(String url) throws JMSException {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
         Connection connection = factory.createConnection();
         connections.add(connection);
@@ -174,16 +209,20 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
             String brokerId = "broker" + i;
             BrokerService broker = createBroker(brokerId);
             broker.setPersistent(false);
-            PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
-            plugin.setConfig(partitioning);
-            broker.setPlugins(new BrokerPlugin[]{plugin});
             broker.addConnector("tcp://localhost:0").setName("tcp");
+            addPartitionBrokerPlugin(broker);
             broker.start();
             broker.waitUntilStarted();
             partitioning.brokers.put(brokerId, getConnectURL(brokerId));
         }
     }
 
+    protected void addPartitionBrokerPlugin(BrokerService broker) {
+        PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
+        plugin.setConfig(partitioning);
+        broker.setPlugins(new BrokerPlugin[]{plugin});
+    }
+
     protected BrokerService createBroker(String name) {
         BrokerService broker = new BrokerService();
         broker.setBrokerName(name);
@@ -191,8 +230,8 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
         return broker;
     }
 
-    @Override
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         for (Connection connection : connections) {
             try {
                 connection.close();
@@ -208,7 +247,6 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
             }
         }
         brokers.clear();
-        super.tearDown();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25f70ad4/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
b/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
new file mode 100644
index 0000000..45193bb
--- /dev/null
+++ b/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.leveldb.replicated.groups.ZKClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.junit.After;
+import org.junit.Before;
+import org.linkedin.util.clock.Timespan;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+
+/**
+ */
+public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest {
+
+    NIOServerCnxnFactory connector;
+
+    @Before
+    public void setUp() throws Exception {
+        System.out.println("Starting ZooKeeper");
+        ZooKeeperServer zk_server = new ZooKeeperServer();
+        zk_server.setTickTime(500);
+        zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"),
new File("target/test-data/zk-data")));
+        connector = new NIOServerCnxnFactory();
+        connector.configure(new InetSocketAddress(0), 100);
+        connector.startup(zk_server);
+        System.out.println("ZooKeeper Started");
+        super.setUp();
+    }
+
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if( connector!=null ) {
+          connector.shutdown();
+          connector = null;
+        }
+    }
+
+    String zkPath = "/partition-config";
+
+    @Override
+    protected void createBrokerCluster(int brokerCount) throws Exception {
+        // Store the partitioning in ZK.
+        ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"),
null);
+        try {
+            zk_client.start();
+            zk_client.waitForConnected(Timespan.parse("30s"));
+            try {
+                zk_client.delete(zkPath);
+            } catch (Throwable e) {
+            }
+            zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT);
+        } finally {
+            zk_client.close();
+        }
+        super.createBrokerCluster(brokerCount);
+    }
+
+    @Override
+    protected void addPartitionBrokerPlugin(BrokerService broker) {
+        // Have the borker plugin get the partition config via ZK.
+        ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){
+            @Override
+            public String getBrokerURL(PartitionBroker partitionBroker, String id) {
+                try {
+                    return getConnectURL(id);
+                } catch (Exception e) {
+                    return null;
+                }
+            }
+        };
+        plugin.setZkAddress("localhost:" + connector.getLocalPort());
+        plugin.setZkPath(zkPath);
+        broker.setPlugins(new BrokerPlugin[]{plugin});
+    }
+}


Mime
View raw message