activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Implementing AMQ-4788 - Add support for allowing the broker to partition client client load across a broker cluster using a partitioning config
Date Mon, 07 Oct 2013 14:20:16 GMT
Updated Branches:
  refs/heads/trunk 0da02fca9 -> 7c63788e1


Implementing AMQ-4788 - Add support for allowing the broker to partition client client load across a broker cluster using a partitioning config


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c63788e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c63788e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c63788e

Branch: refs/heads/trunk
Commit: 7c63788e1a60c7303103c54ff368ae128aca7933
Parents: 0da02fc
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Wed Oct 2 09:16:37 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Mon Oct 7 10:20:00 2013 -0400

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |   4 +
 activemq-partition/pom.xml                      | 110 +++++++
 .../activemq/partition/ConnectionProxy.java     | 138 ++++++++
 .../activemq/partition/PartitionBroker.java     | 322 +++++++++++++++++++
 .../partition/PartitionBrokerPlugin.java        |  53 +++
 .../partition/ZooKeeperPartitionBroker.java     | 112 +++++++
 .../ZooKeeperPartitionBrokerPlugin.java         |  68 ++++
 .../activemq/partition/dto/Partitioning.java    | 152 +++++++++
 .../apache/activemq/partition/dto/Target.java   |  62 ++++
 .../activemq/partition/PartitionBrokerTest.java | 196 +++++++++++
 activemq-spring/pom.xml                         |   1 +
 pom.xml                                         |   1 +
 12 files changed, 1219 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 725e83d..1ce75a5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1347,6 +1347,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         return transport.getRemoteAddress();
     }
 
+    public Transport getTransport() {
+        return transport;
+    }
+
     @Override
     public String getConnectionId() {
         List<TransportConnectionState> connectionStates = listConnectionStates();

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml
new file mode 100644
index 0000000..25f070e
--- /dev/null
+++ b/activemq-partition/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.9-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>activemq-partition</artifactId>
+  <packaging>jar</packaging>
+
+  <name>ActiveMQ :: Partition Management</name>
+  <description>Used to partition clients over a cluster of brokers</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-leveldb-store</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.linkedin</groupId>
+      <artifactId>org.linkedin.zookeeper-impl</artifactId>
+      <version>${linkedin-zookeeper-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.linkedin</groupId>
+      <artifactId>org.linkedin.util-core</artifactId>
+      <version>${linkedin-zookeeper-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper-version}</version>
+    </dependency>
+
+    <!-- For Optional Snappy Compression -->
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${jackson-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${jackson-version}</version>
+    </dependency>
+
+    <!-- Testing Dependencies -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
new file mode 100644
index 0000000..cab6eb6
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.Connector;
+import org.apache.activemq.broker.region.ConnectionStatistics;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.Response;
+
+import java.io.IOException;
+
+/**
+ * A Connection implementation that proxies all Connection invocation to
+ * a delegate connection.
+ */
+public class ConnectionProxy implements Connection {
+    final Connection next;
+
+    public ConnectionProxy(Connection next) {
+        this.next = next;
+    }
+
+    @Override
+    public void dispatchAsync(Command command) {
+        next.dispatchAsync(command);
+    }
+
+    @Override
+    public void dispatchSync(Command message) {
+        next.dispatchSync(message);
+    }
+
+    @Override
+    public String getConnectionId() {
+        return next.getConnectionId();
+    }
+
+    @Override
+    public Connector getConnector() {
+        return next.getConnector();
+    }
+
+    @Override
+    public int getDispatchQueueSize() {
+        return next.getDispatchQueueSize();
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return next.getRemoteAddress();
+    }
+
+    @Override
+    public ConnectionStatistics getStatistics() {
+        return next.getStatistics();
+    }
+
+    @Override
+    public boolean isActive() {
+        return next.isActive();
+    }
+
+    @Override
+    public boolean isBlocked() {
+        return next.isBlocked();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return next.isConnected();
+    }
+
+    @Override
+    public boolean isFaultTolerantConnection() {
+        return next.isFaultTolerantConnection();
+    }
+
+    @Override
+    public boolean isManageable() {
+        return next.isManageable();
+    }
+
+    @Override
+    public boolean isNetworkConnection() {
+        return next.isNetworkConnection();
+    }
+
+    @Override
+    public boolean isSlow() {
+        return next.isSlow();
+    }
+
+    @Override
+    public Response service(Command command) {
+        return next.service(command);
+    }
+
+    @Override
+    public void serviceException(Throwable error) {
+        next.serviceException(error);
+    }
+
+    @Override
+    public void serviceExceptionAsync(IOException e) {
+        next.serviceExceptionAsync(e);
+    }
+
+    @Override
+    public void start() throws Exception {
+        next.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        next.stop();
+    }
+
+    @Override
+    public void updateClient(ConnectionControl control) {
+        next.updateClient(control);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
new file mode 100644
index 0000000..1a8e78b
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.broker.*;
+import org.apache.activemq.command.*;
+import org.apache.activemq.partition.dto.Partitioning;
+import org.apache.activemq.partition.dto.Target;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.LRUCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A BrokerFilter which partitions client connections over a cluster of brokers.
+ *
+ * It can use a client identifier like client id, authenticated user name, source ip
+ * address or even destination being used by the connection to figure out which
+ * is the best broker in the cluster that the connection should be using and then
+ * redirects failover clients to that broker.
+ */
+public class PartitionBroker extends BrokerFilter {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
+    protected final PartitionBrokerPlugin plugin;
+    protected boolean reloadConfigOnPoll = true;
+
+    public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
+        super(broker);
+        this.plugin = plugin;
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                Thread.currentThread().setName("Partition Monitor");
+                onMonitorStart();
+                try {
+                    runPartitionMonitor();
+                } catch (Exception e) {
+                    onMonitorStop();
+                }
+            }
+        });
+    }
+
+    protected void onMonitorStart() {
+    }
+    protected void onMonitorStop() {
+    }
+
+    protected void runPartitionMonitor() {
+        while( !isStopped() ) {
+            try {
+                monitorWait();
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if(reloadConfigOnPoll) {
+                try {
+                    reloadConfiguration();
+                } catch (Exception e) {
+                    continue;
+                }
+            }
+
+            for( ConnectionMonitor monitor: monitors.values()) {
+                checkTarget(monitor);
+            }
+        }
+    }
+
+    protected void monitorWait() throws InterruptedException {
+        synchronized (this) {
+            this.wait(1000);
+        }
+    }
+
+    protected void monitorWakeup()  {
+        synchronized (this) {
+            this.notifyAll();
+        }
+    }
+
+    protected void reloadConfiguration() throws Exception {
+    }
+
+    protected void checkTarget(ConnectionMonitor monitor) {
+
+        // can we find a preferred target for the connection?
+        Target targetDTO = pickBestBroker(monitor);
+        if( targetDTO == null || targetDTO.ids==null) {
+            LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
+            return;
+        }
+
+        // Are we one the the targets?
+        if( targetDTO.ids.contains(getBrokerName()) ) {
+            LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
+            return;
+        }
+
+        // Then we need to move the connection over.
+        String connectionString = getConnectionString(targetDTO.ids);
+        if( connectionString==null ) {
+            LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
+        }
+
+        LOG.info("Redirecting connection to: " + connectionString);
+        TransportConnection connection = (TransportConnection)monitor.next;
+        ConnectionControl cc = new ConnectionControl();
+        cc.setConnectedBrokers(connectionString);
+        cc.setRebalanceConnection(true);
+        connection.dispatchAsync(cc);
+    }
+
+    protected String getConnectionString(HashSet<String> ids) {
+        if( getConfig().brokers==null || getConfig().brokers.isEmpty() )
+            return null;
+        StringBuilder rc = new StringBuilder();
+        for (String id : ids) {
+            String url = getConfig().brokers.get(id);
+            if( url!=null ) {
+                if( rc.length()!=0 ) {
+                    rc.append(',');
+                }
+                rc.append(url);
+            }
+        }
+        return rc.toString();
+    }
+
+    protected Target pickBestBroker(ConnectionMonitor monitor) {
+
+        if( getConfig() ==null )
+            return null;
+
+        if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
+            TransportConnection connection = (TransportConnection)monitor.context.getConnection();
+            Transport transport = connection.getTransport();
+            Socket socket = transport.narrow(Socket.class);
+            if( socket !=null ) {
+                SocketAddress address = socket.getRemoteSocketAddress();
+                if( address instanceof InetSocketAddress) {
+                    String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
+                    Target targetDTO = getConfig().bySourceIp.get(ip);
+                    if( targetDTO!=null ) {
+                        return targetDTO;
+                    }
+                }
+            }
+        }
+
+        if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
+            String userName = monitor.context.getUserName();
+            if( userName !=null ) {
+                Target targetDTO = getConfig().byUserName.get(userName);
+                if( targetDTO!=null ) {
+                    return targetDTO;
+                }
+            }
+        }
+
+        if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
+            String clientId = monitor.context.getClientId();
+            if( clientId!=null ) {
+                Target targetDTO = getConfig().byClientId.get(clientId);
+                if( targetDTO!=null ) {
+                    return targetDTO;
+                }
+            }
+        }
+
+        if(
+             (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
+          || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
+          ) {
+
+            ActiveMQDestination best = monitor.findMostActiveDestination(plugin);
+            if( best!=null ) {
+                if( getConfig().byQueue !=null && !getConfig().byQueue.isEmpty() && best.isQueue() ) {
+                    Target targetDTO = getConfig().byQueue.get(best.getPhysicalName());
+                    if( targetDTO!=null ) {
+                        return targetDTO;
+                    }
+                }
+
+                if( getConfig().byTopic !=null && !getConfig().byTopic.isEmpty() && best.isTopic() ) {
+                    Target targetDTO = getConfig().byTopic.get(best.getPhysicalName());
+                    if( targetDTO!=null ) {
+                        return targetDTO;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    protected final ConcurrentHashMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
+
+    @Override
+    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
+        ConnectionMonitor monitor = new ConnectionMonitor(context);
+        context.setConnection(monitor);
+        monitors.put(info.getConnectionId(), monitor);
+        super.addConnection(context, info);
+        checkTarget(monitor);
+    }
+
+    @Override
+    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
+        super.removeConnection(context, info, error);
+        ConnectionMonitor removed = monitors.remove(info.getConnectionId());
+        if( removed!=null ) {
+            context.setConnection(removed.next);
+        }
+    }
+
+    @Override
+    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+        ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
+        if( monitor!=null ) {
+            monitor.onSend(producerExchange, messageSend);
+        }
+    }
+
+    protected Partitioning getConfig() {
+        return plugin.getConfig();
+    }
+
+
+    static class Traffic {
+        long messages;
+        long bytes;
+    }
+
+    static class ConnectionMonitor extends ConnectionProxy {
+        final ConnectionContext context;
+
+        LRUCache<ActiveMQDestination, Traffic> trafficPerDestination =  new LRUCache<ActiveMQDestination, Traffic>();
+
+        ConnectionMonitor(ConnectionContext context) {
+            super(context.getConnection());
+            this.context = context;
+        }
+
+        synchronized public ActiveMQDestination findMostActiveDestination(PartitionBrokerPlugin plugin) {
+            ActiveMQDestination best = null;
+            long bestSize = 0 ;
+            for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
+                Traffic t = entry.getValue();
+                // Once we get enough messages...
+                if( t.messages < plugin.getMinTransferCount()) {
+                    continue;
+                }
+                if( t.bytes > bestSize) {
+                    bestSize = t.bytes;
+                    best = entry.getKey();
+                }
+            }
+            return best;
+        }
+
+        synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
+            ActiveMQDestination dest = message.getDestination();
+            Traffic traffic = trafficPerDestination.get(dest);
+            if( traffic == null ) {
+                traffic = new Traffic();
+                trafficPerDestination.put(dest, traffic);
+            }
+            traffic.messages += 1;
+            traffic.bytes += message.getSize();
+        }
+
+
+        @Override
+        public void dispatchAsync(Command command) {
+            if (command.getClass() == MessageDispatch.class) {
+                MessageDispatch md = (MessageDispatch) command;
+                Message message = md.getMessage();
+                synchronized (this) {
+                    ActiveMQDestination dest = md.getDestination();
+                    Traffic traffic = trafficPerDestination.get(dest);
+                    if( traffic == null ) {
+                        traffic = new Traffic();
+                        trafficPerDestination.put(dest, traffic);
+                    }
+                    traffic.messages += 1;
+                    traffic.bytes += message.getSize();
+                }
+            }
+            super.dispatchAsync(command);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
new file mode 100644
index 0000000..936d2ea
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.partition.dto.Partitioning;
+
+/**
+ * A BrokerPlugin which partitions client connections over a cluster of brokers.
+ *
+ * @org.apache.xbean.XBean element="partitionBrokerPlugin"
+ */
+public class PartitionBrokerPlugin implements BrokerPlugin {
+
+    protected int minTransferCount;
+    protected Partitioning config;
+
+    @Override
+    public Broker installPlugin(Broker broker) throws Exception {
+        return new PartitionBroker(broker, this);
+    }
+
+    public int getMinTransferCount() {
+        return minTransferCount;
+    }
+
+    public void setMinTransferCount(int minTransferCount) {
+        this.minTransferCount = minTransferCount;
+    }
+
+    public Partitioning getConfig() {
+        return config;
+    }
+
+    public void setConfig(Partitioning config) {
+        this.config = config;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
new file mode 100644
index 0000000..2c18f2d
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.leveldb.replicated.groups.ZKClient;
+import org.apache.activemq.partition.dto.Partitioning;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.linkedin.util.clock.Timespan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ZooKeeperPartitionBroker extends PartitionBroker {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
+
+    protected volatile ZKClient zk_client = null;
+    protected volatile Partitioning config;
+
+    public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
+        super(broker, plugin);
+    }
+
+
+    @Override
+    protected void onMonitorStop() {
+        zkDisconnect();
+    }
+
+    @Override
+    protected Partitioning getConfig() {
+        return config;
+    }
+
+    protected ZooKeeperPartitionBrokerPlugin plugin() {
+        return (ZooKeeperPartitionBrokerPlugin)plugin;
+    }
+
+    protected void zkConnect() throws Exception {
+        zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
+        if( plugin().getZkPassword()!=null ) {
+            zk_client.setPassword(plugin().getZkPassword());
+        }
+        zk_client.start();
+        zk_client.waitForConnected(Timespan.parse("30s"));
+    }
+
+    protected void zkDisconnect() {
+        if( zk_client!=null ) {
+            zk_client.close();
+            zk_client = null;
+        }
+    }
+
+    protected void reloadConfiguration() throws Exception {
+        if( zk_client==null )  {
+            LOG.debug("Connecting to ZooKeeper");
+            try {
+                zkConnect();
+                LOG.debug("Connected to ZooKeeper");
+            } catch (Exception e) {
+                LOG.debug("Connection to ZooKeeper failed: "+e);
+                zkDisconnect();
+                throw e;
+            }
+        }
+
+        byte[] data = null;
+        try {
+            Stat stat = new Stat();
+            data = zk_client.getData(plugin().getZkPath(), new Watcher() {
+                @Override
+                public void process(WatchedEvent watchedEvent) {
+                    try {
+                        reloadConfiguration();
+                    } catch (Exception e) {
+                    }
+                    monitorWakeup();
+                }
+            }, stat);
+            reloadConfigOnPoll = false;
+        } catch (Exception e) {
+            LOG.warn("Could load partitioning configuration: " + e, e);
+            reloadConfigOnPoll = true;
+        }
+
+        try {
+            config = Partitioning.MAPPER.readValue(data, Partitioning.class);
+        } catch (Exception e) {
+            LOG.warn("Invalid partitioning configuration: " + e, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
new file mode 100644
index 0000000..34fa0fc
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerPlugin;
+
+/**
+ * A PartitionBrokerPlugin which gets it's configuration from ZooKeeper.
+ */
+public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin {
+
+    String zkAddress = "127.0.0.1:2181";
+    String zkPassword;
+    String zkPath = "/broker-assignments";
+    String zkSessionTmeout = "10s";
+
+    @Override
+    public Broker installPlugin(Broker broker) throws Exception {
+        return new ZooKeeperPartitionBroker(broker, this);
+    }
+
+    public String getZkAddress() {
+        return zkAddress;
+    }
+
+    public void setZkAddress(String zkAddress) {
+        this.zkAddress = zkAddress;
+    }
+
+    public String getZkPassword() {
+        return zkPassword;
+    }
+
+    public void setZkPassword(String zkPassword) {
+        this.zkPassword = zkPassword;
+    }
+
+    public String getZkPath() {
+        return zkPath;
+    }
+
+    public void setZkPath(String zkPath) {
+        this.zkPath = zkPath;
+    }
+
+    public String getZkSessionTmeout() {
+        return zkSessionTmeout;
+    }
+
+    public void setZkSessionTmeout(String zkSessionTmeout) {
+        this.zkSessionTmeout = zkSessionTmeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
new file mode 100644
index 0000000..299d73b
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * The main Configuration class for the PartitionBroker plugin
+ *
+ * @org.apache.xbean.XBean element="partitioning"
+ */
+public class Partitioning {
+
+    static final public ObjectMapper MAPPER = new ObjectMapper();
+    static {
+        MAPPER.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+        MAPPER.disable(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
+    }
+
+    static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper();
+    static {
+        TO_STRING_MAPPER.disable(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
+        TO_STRING_MAPPER.enable(SerializationConfig.Feature.INDENT_OUTPUT);
+    }
+
+    /**
+     * If a client connects with a clientId which is listed in the
+     * map, then he will be immediately reconnected
+     * to the partition target immediately.
+     */
+    @JsonProperty("by_client_id")
+    public HashMap<String, Target> byClientId;
+
+    /**
+     * If a client connects with a user priciple which is listed in the
+     * map, then he will be immediately reconnected
+     * to the partition target immediately.
+     */
+    @JsonProperty("by_user_name")
+    public HashMap<String, Target> byUserName;
+
+    /**
+     * If a client connects with source ip which is listed in the
+     * map, then he will be immediately reconnected
+     * to the partition target immediately.
+     */
+    @JsonProperty("by_source_ip")
+    public HashMap<String, Target> bySourceIp;
+
+    /**
+     * Used to map the preferred partitioning of queues across
+     * a set of brokers.  Once a it is deemed that a connection mostly
+     * works with a set of targets configured in this map, the client
+     * will be reconnected to the appropriate target.
+     */
+    @JsonProperty("by_queue")
+    public HashMap<String, Target> byQueue;
+
+    /**
+     * Used to map the preferred partitioning of topics across
+     * a set of brokers.  Once a it is deemed that a connection mostly
+     * works with a set of targets configured in this map, the client
+     * will be reconnected to the appropriate target.
+     */
+    @JsonProperty("by_topic")
+    public HashMap<String, Target> byTopic;
+
+    /**
+     * Maps broker names to broker URLs.
+     */
+    @JsonProperty("brokers")
+    public HashMap<String, String> brokers;
+
+
+    @Override
+    public String toString() {
+        try {
+            return TO_STRING_MAPPER.writeValueAsString(this);
+        } catch (IOException e) {
+            return super.toString();
+        }
+    }
+
+    public HashMap<String, String> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(HashMap<String, String> brokers) {
+        this.brokers = brokers;
+    }
+
+    public HashMap<String, Target> getByClientId() {
+        return byClientId;
+    }
+
+    public void setByClientId(HashMap<String, Target> byClientId) {
+        this.byClientId = byClientId;
+    }
+
+    public HashMap<String, Target> getByQueue() {
+        return byQueue;
+    }
+
+    public void setByQueue(HashMap<String, Target> byQueue) {
+        this.byQueue = byQueue;
+    }
+
+    public HashMap<String, Target> getBySourceIp() {
+        return bySourceIp;
+    }
+
+    public void setBySourceIp(HashMap<String, Target> bySourceIp) {
+        this.bySourceIp = bySourceIp;
+    }
+
+    public HashMap<String, Target> getByTopic() {
+        return byTopic;
+    }
+
+    public void setByTopic(HashMap<String, Target> byTopic) {
+        this.byTopic = byTopic;
+    }
+
+    public HashMap<String, Target> getByUserName() {
+        return byUserName;
+    }
+
+    public void setByUserName(HashMap<String, Target> byUserName) {
+        this.byUserName = byUserName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
new file mode 100644
index 0000000..b6f0ee2
--- /dev/null
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import scala.actors.threadpool.Arrays;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * Represents a partition target.  This identifies the brokers that
+ * a partition lives on.
+ *
+ * @org.apache.xbean.XBean element="target"
+ */
+public class Target {
+
+    @JsonProperty("ids")
+    public HashSet<String> ids = new HashSet<String>();
+
+    public Target() {
+        ids = new HashSet<String>();
+    }
+
+    public Target(String ...ids) {
+        this.ids.addAll(Arrays.asList(ids));
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return Partitioning.TO_STRING_MAPPER.writeValueAsString(this);
+        } catch (IOException e) {
+            return super.toString();
+        }
+    }
+
+    public HashSet<String> getIds() {
+        return ids;
+    }
+
+    public void setIds(Collection<String> ids) {
+        this.ids = new HashSet<String>(ids);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
new file mode 100644
index 0000000..9b7450c
--- /dev/null
+++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.partition;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.partition.dto.Partitioning;
+import org.apache.activemq.partition.dto.Target;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit tests for the PartitionBroker plugin.
+ */
+public class PartitionBrokerTest extends AutoFailTestSupport {
+
+    protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
+    protected ArrayList<Connection> connections = new ArrayList<Connection>();
+    Partitioning partitioning;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        partitioning = new Partitioning();
+        partitioning.brokers = new HashMap<String, String>();
+    }
+
+
+    public void testPartitionByClientId() throws Exception {
+        partitioning.byClientId = new HashMap<String, Target>();
+        partitioning.byClientId.put("client1", new Target("broker1"));
+        partitioning.byClientId.put("client2", new Target("broker2"));
+        createBrokerCluster(2);
+
+        Connection connection = createConnectionTo("broker2");
+
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(0, getTransportConnector("broker1").getConnections().size());
+                assertEquals(1, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+
+        connection.setClientID("client1");
+        connection.start();
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(1, getTransportConnector("broker1").getConnections().size());
+                assertEquals(0, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+    }
+
+    public void testPartitionByQueue() throws Exception {
+        partitioning.byQueue = new HashMap<String, Target>();
+        partitioning.byQueue.put("foo", new Target("broker1"));
+        createBrokerCluster(2);
+
+        Connection connection = createConnectionTo("broker2");
+
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(0, getTransportConnector("broker1").getConnections().size());
+                assertEquals(1, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createQueue("foo"));
+        for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("#"+i));
+        }
+
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(1, getTransportConnector("broker1").getConnections().size());
+                assertEquals(0, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+    }
+    static interface Task {
+        public void run() throws Exception;
+    }
+
+    private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
+        long timeMS = unit.toMillis(time);
+        long deadline = System.currentTimeMillis() + timeMS;
+        while (true) {
+            try {
+                task.run();
+                return;
+            } catch (Throwable e) {
+                long remaining = deadline - System.currentTimeMillis();
+                if( remaining <=0 ) {
+                    if( e instanceof RuntimeException ) {
+                        throw (RuntimeException)e;
+                    }
+                    if( e instanceof Error ) {
+                        throw (Error)e;
+                    }
+                    throw new RuntimeException(e);
+                }
+                Thread.sleep(Math.min(timeMS/10, remaining));
+            }
+        }
+    }
+
+    protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
+        String url = "failover://(" + getConnectURL(brokerId) + ")";
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
+        Connection connection = factory.createConnection();
+        connections.add(connection);
+        return connection;
+    }
+
+    protected String getConnectURL(String broker) throws IOException, URISyntaxException {
+        TransportConnector tcp = getTransportConnector(broker);
+        return tcp.getConnectUri().toString();
+    }
+
+    private TransportConnector getTransportConnector(String broker) {
+        BrokerService brokerService = brokers.get(broker);
+        if( brokerService==null ) {
+            throw new IllegalArgumentException("Invalid broker id");
+        }
+        return brokerService.getTransportConnectorByName("tcp");
+    }
+
+    protected void createBrokerCluster(int brokerCount) throws Exception {
+        for (int i = 1; i <= brokerCount; i++) {
+            String brokerId = "broker" + i;
+            BrokerService broker = createBroker(brokerId);
+            broker.setPersistent(false);
+            PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
+            plugin.setConfig(partitioning);
+            broker.setPlugins(new BrokerPlugin[]{plugin});
+            broker.addConnector("tcp://localhost:0").setName("tcp");
+            broker.start();
+            broker.waitUntilStarted();
+            partitioning.brokers.put(brokerId, getConnectURL(brokerId));
+        }
+    }
+
+    protected BrokerService createBroker(String name) {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(name);
+        brokers.put(name, broker);
+        return broker;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        for (Connection connection : connections) {
+            try {
+                connection.close();
+            } catch (Throwable e) {
+            }
+        }
+        connections.clear();
+        for (BrokerService broker : brokers.values()) {
+            try {
+                broker.stop();
+                broker.waitUntilStopped();
+            } catch (Throwable e) {
+            }
+        }
+        brokers.clear();
+        super.tearDown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/activemq-spring/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml
index 9136c05..36e2c55 100755
--- a/activemq-spring/pom.xml
+++ b/activemq-spring/pom.xml
@@ -237,6 +237,7 @@
                 <include>${basedir}/../activemq-kahadb-store/src/main/java</include>
                 <include>${basedir}/../activemq-mqtt/src/main/java</include>
                 <include>${basedir}/../activemq-stomp/src/main/java</include>
+                <include>${basedir}/../activemq-partition/src/main/java</include>
                 <include>${basedir}/../activemq-runtime-config/src/main/java</include>
               </includes>
               <strictXsdOrder>false</strictXsdOrder>

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c63788e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c293ffd..19b6c64 100755
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,7 @@
     <module>activemq-runtime-config</module>
     <module>activemq-tooling</module>
     <module>activemq-web</module>
+    <module>activemq-partition</module>
     <module>activemq-osgi</module>
     <module>activemq-blueprint</module>
     <module>activemq-web-demo</module>


Mime
View raw message