activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Implementing AMQ-4788 - Lets give preference to consumers vs producers.
Date Mon, 07 Oct 2013 16:41:18 GMT
Updated Branches:
  refs/heads/trunk 1981adf42 -> f88f2803a


Implementing AMQ-4788 - Lets give preference to consumers vs producers.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f88f2803
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f88f2803
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f88f2803

Branch: refs/heads/trunk
Commit: f88f2803ac0a2a984137caa86ca9ef2b1199fbe9
Parents: 1981adf
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Mon Oct 7 12:40:28 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Mon Oct 7 12:41:16 2013 -0400

----------------------------------------------------------------------
 .../activemq/partition/ConnectionProxy.java     | 138 -------------------
 .../activemq/partition/PartitionBroker.java     | 112 +++++++++------
 .../activemq/partition/PartitionBrokerTest.java |  36 +++--
 3 files changed, 95 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f88f2803/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
b/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
deleted file mode 100644
index cab6eb6..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Connection;
-import org.apache.activemq.broker.Connector;
-import org.apache.activemq.broker.region.ConnectionStatistics;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.Response;
-
-import java.io.IOException;
-
-/**
- * A Connection implementation that proxies all Connection invocation to
- * a delegate connection.
- */
-public class ConnectionProxy implements Connection {
-    final Connection next;
-
-    public ConnectionProxy(Connection next) {
-        this.next = next;
-    }
-
-    @Override
-    public void dispatchAsync(Command command) {
-        next.dispatchAsync(command);
-    }
-
-    @Override
-    public void dispatchSync(Command message) {
-        next.dispatchSync(message);
-    }
-
-    @Override
-    public String getConnectionId() {
-        return next.getConnectionId();
-    }
-
-    @Override
-    public Connector getConnector() {
-        return next.getConnector();
-    }
-
-    @Override
-    public int getDispatchQueueSize() {
-        return next.getDispatchQueueSize();
-    }
-
-    @Override
-    public String getRemoteAddress() {
-        return next.getRemoteAddress();
-    }
-
-    @Override
-    public ConnectionStatistics getStatistics() {
-        return next.getStatistics();
-    }
-
-    @Override
-    public boolean isActive() {
-        return next.isActive();
-    }
-
-    @Override
-    public boolean isBlocked() {
-        return next.isBlocked();
-    }
-
-    @Override
-    public boolean isConnected() {
-        return next.isConnected();
-    }
-
-    @Override
-    public boolean isFaultTolerantConnection() {
-        return next.isFaultTolerantConnection();
-    }
-
-    @Override
-    public boolean isManageable() {
-        return next.isManageable();
-    }
-
-    @Override
-    public boolean isNetworkConnection() {
-        return next.isNetworkConnection();
-    }
-
-    @Override
-    public boolean isSlow() {
-        return next.isSlow();
-    }
-
-    @Override
-    public Response service(Command command) {
-        return next.service(command);
-    }
-
-    @Override
-    public void serviceException(Throwable error) {
-        next.serviceException(error);
-    }
-
-    @Override
-    public void serviceExceptionAsync(IOException e) {
-        next.serviceExceptionAsync(e);
-    }
-
-    @Override
-    public void start() throws Exception {
-        next.start();
-    }
-
-    @Override
-    public void stop() throws Exception {
-        next.stop();
-    }
-
-    @Override
-    public void updateClient(ConnectionControl control) {
-        next.updateClient(control);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f88f2803/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
index 1a8e78b..5190207 100644
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
@@ -20,6 +20,8 @@ import org.apache.activemq.broker.*;
 import org.apache.activemq.command.*;
 import org.apache.activemq.partition.dto.Partitioning;
 import org.apache.activemq.partition.dto.Target;
+import org.apache.activemq.state.ConsumerState;
+import org.apache.activemq.state.SessionState;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.LRUCache;
 import org.slf4j.Logger;
@@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -132,7 +133,7 @@ public class PartitionBroker extends BrokerFilter {
         }
 
         LOG.info("Redirecting connection to: " + connectionString);
-        TransportConnection connection = (TransportConnection)monitor.next;
+        TransportConnection connection = (TransportConnection)monitor.context.getConnection();
         ConnectionControl cc = new ConnectionControl();
         cc.setConnectedBrokers(connectionString);
         cc.setRebalanceConnection(true);
@@ -155,6 +156,10 @@ public class PartitionBroker extends BrokerFilter {
         return rc.toString();
     }
 
+    static private class Score {
+        int value;
+    }
+
     protected Target pickBestBroker(ConnectionMonitor monitor) {
 
         if( getConfig() ==null )
@@ -201,22 +206,62 @@ public class PartitionBroker extends BrokerFilter {
           || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
           ) {
 
-            ActiveMQDestination best = monitor.findMostActiveDestination(plugin);
-            if( best!=null ) {
-                if( getConfig().byQueue !=null && !getConfig().byQueue.isEmpty()
&& best.isQueue() ) {
-                    Target targetDTO = getConfig().byQueue.get(best.getPhysicalName());
-                    if( targetDTO!=null ) {
-                        return targetDTO;
+            // Collect the destinations the connection is consuming from...
+            HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
+            for (SessionState session : monitor.context.getConnectionState().getSessionStates())
{
+                for (ConsumerState consumer : session.getConsumerStates()) {
+                    ActiveMQDestination destination = consumer.getInfo().getDestination();
+                    if( destination.isComposite() ) {
+                        dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
+                    } else {
+                        dests.addAll(Collections.singletonList(destination));
                     }
                 }
+            }
 
-                if( getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()
&& best.isTopic() ) {
-                    Target targetDTO = getConfig().byTopic.get(best.getPhysicalName());
-                    if( targetDTO!=null ) {
-                        return targetDTO;
+            // Group them by the partitioning target for the destinations and score them..
+            HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
+            for (ActiveMQDestination dest : dests) {
+                Target target = getTarget(dest);
+                if( target!=null ) {
+                    Score score = targetScores.get(target);
+                    if( score == null ) {
+                        score = new Score();
+                        targetScores.put(target, score);
                     }
+                    score.value++;
                 }
             }
+
+            // The target with largest score wins..
+            if( !targetScores.isEmpty() ) {
+                Target bestTarget = null;
+                int bestScore=0;
+                for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
+                    if( entry.getValue().value > bestScore ) {
+                        bestTarget = entry.getKey();
+                    }
+                }
+                return bestTarget;
+            }
+
+            // If we get here is because there were no consumers, or the destinations for
those
+            // consumers did not have an assigned destination..  So partition based on producer
+            // usage.
+            Target best = monitor.findBestProducerTarget(this);
+            if( best!=null ) {
+                return best;
+            }
+        }
+        return null;
+    }
+
+    protected Target getTarget(ActiveMQDestination dest) {
+        Partitioning config = getConfig();
+        if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty()
) {
+            return config.byQueue.get(dest.getPhysicalName());
+        } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty()
) {
+            return config.byTopic.get(dest.getPhysicalName());
         }
         return null;
     }
@@ -226,7 +271,6 @@ public class PartitionBroker extends BrokerFilter {
     @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{
         ConnectionMonitor monitor = new ConnectionMonitor(context);
-        context.setConnection(monitor);
         monitors.put(info.getConnectionId(), monitor);
         super.addConnection(context, info);
         checkTarget(monitor);
@@ -236,9 +280,6 @@ public class PartitionBroker extends BrokerFilter {
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable
error) throws Exception {
         super.removeConnection(context, info, error);
         ConnectionMonitor removed = monitors.remove(info.getConnectionId());
-        if( removed!=null ) {
-            context.setConnection(removed.next);
-        }
     }
 
     @Override
@@ -259,28 +300,30 @@ public class PartitionBroker extends BrokerFilter {
         long bytes;
     }
 
-    static class ConnectionMonitor extends ConnectionProxy {
-        final ConnectionContext context;
+    static class ConnectionMonitor {
 
+        final ConnectionContext context;
         LRUCache<ActiveMQDestination, Traffic> trafficPerDestination =  new LRUCache<ActiveMQDestination,
Traffic>();
 
-        ConnectionMonitor(ConnectionContext context) {
-            super(context.getConnection());
+        public ConnectionMonitor(ConnectionContext context) {
             this.context = context;
         }
 
-        synchronized public ActiveMQDestination findMostActiveDestination(PartitionBrokerPlugin
plugin) {
-            ActiveMQDestination best = null;
+        synchronized public Target findBestProducerTarget(PartitionBroker broker) {
+            Target best = null;
             long bestSize = 0 ;
             for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet())
{
                 Traffic t = entry.getValue();
                 // Once we get enough messages...
-                if( t.messages < plugin.getMinTransferCount()) {
+                if( t.messages < broker.plugin.getMinTransferCount()) {
                     continue;
                 }
                 if( t.bytes > bestSize) {
                     bestSize = t.bytes;
-                    best = entry.getKey();
+                    Target target = broker.getTarget(entry.getKey());
+                    if( target!=null ) {
+                        best = target;
+                    }
                 }
             }
             return best;
@@ -298,25 +341,6 @@ public class PartitionBroker extends BrokerFilter {
         }
 
 
-        @Override
-        public void dispatchAsync(Command command) {
-            if (command.getClass() == MessageDispatch.class) {
-                MessageDispatch md = (MessageDispatch) command;
-                Message message = md.getMessage();
-                synchronized (this) {
-                    ActiveMQDestination dest = md.getDestination();
-                    Traffic traffic = trafficPerDestination.get(dest);
-                    if( traffic == null ) {
-                        traffic = new Traffic();
-                        trafficPerDestination.put(dest, traffic);
-                    }
-                    traffic.messages += 1;
-                    traffic.bytes += message.getSize();
-                }
-            }
-            super.dispatchAsync(command);
-        }
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f88f2803/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
index 9b7450c..efb5e66 100644
--- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
+++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
@@ -24,10 +24,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.partition.dto.Partitioning;
 import org.apache.activemq.partition.dto.Target;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -81,7 +78,7 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
         partitioning.byQueue.put("foo", new Target("broker1"));
         createBrokerCluster(2);
 
-        Connection connection = createConnectionTo("broker2");
+        Connection connection2 = createConnectionTo("broker2");
 
         within(5, TimeUnit.SECONDS, new Task() {
             public void run() throws Exception {
@@ -90,19 +87,40 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
             }
         });
 
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(session.createQueue("foo"));
+        Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo"));
+
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(1, getTransportConnector("broker1").getConnections().size());
+                assertEquals(0, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+
+        Connection connection1 = createConnectionTo("broker2");
+        Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session1.createProducer(session1.createQueue("foo"));
+
+        within(5, TimeUnit.SECONDS, new Task() {
+            public void run() throws Exception {
+                assertEquals(1, getTransportConnector("broker1").getConnections().size());
+                assertEquals(1, getTransportConnector("broker2").getConnections().size());
+            }
+        });
+
         for (int i = 0; i < 100; i++) {
-            producer.send(session.createTextMessage("#"+i));
+            producer.send(session1.createTextMessage("#" + i));
         }
 
         within(5, TimeUnit.SECONDS, new Task() {
             public void run() throws Exception {
-                assertEquals(1, getTransportConnector("broker1").getConnections().size());
+                assertEquals(2, getTransportConnector("broker1").getConnections().size());
                 assertEquals(0, getTransportConnector("broker2").getConnections().size());
             }
         });
     }
+
+
     static interface Task {
         public void run() throws Exception;
     }


Mime
View raw message