activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4719
Date Wed, 11 Sep 2013 08:21:27 GMT
Updated Branches:
  refs/heads/trunk 0ff359341 -> 16c1627ca


Fix for https://issues.apache.org/jira/browse/AMQ-4719


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/16c1627c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/16c1627c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/16c1627c

Branch: refs/heads/trunk
Commit: 16c1627ca0324b255b1dcd28105984d1f7faeabb
Parents: 0ff3593
Author: Rob Davies <rajdavies@gmail.com>
Authored: Wed Sep 11 09:21:06 2013 +0100
Committer: Rob Davies <rajdavies@gmail.com>
Committed: Wed Sep 11 09:21:06 2013 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/broker/ConnectionContext.java |  4 ++++
 .../java/org/apache/activemq/broker/Connector.java    |  6 ++++++
 .../apache/activemq/broker/TransportConnector.java    | 10 ++++++++++
 .../apache/activemq/broker/region/RegionBroker.java   | 14 ++++++++++++--
 4 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/16c1627c/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
index 93064c4..8e1d36e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
@@ -332,4 +332,8 @@ public class ConnectionContext {
     public XATransactionId getXid() {
         return xid;
     }
+
+    public boolean isAllowLinkStealing(){
+       return connector != null && connector.isAllowLinkStealing();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c1627c/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
index 17d0aee..1dd7f17 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Connector.java
@@ -60,4 +60,10 @@ public interface Connector extends Service {
     public boolean  isUpdateClusterClientsOnRemove();
 
     int connectionCount();
+
+    /**
+     * If enabled, older connections with the same clientID are stopped
+     * @return true/false if link stealing is enabled
+     */
+    boolean isAllowLinkStealing();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c1627c/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index b481864..7c78a44 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -74,6 +74,7 @@ public class TransportConnector implements Connector, BrokerServiceAware
{
     private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
     private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
     private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
+    private boolean allowLinkStealing;
 
     LinkedList<String> peerBrokers = new LinkedList<String>();
 
@@ -573,6 +574,15 @@ public class TransportConnector implements Connector, BrokerServiceAware
{
         return connections.size();
     }
 
+    @Override
+    public boolean isAllowLinkStealing() {
+        return allowLinkStealing;
+    }
+
+    public void setAllowLinkStealing (boolean allowLinkStealing) {
+        this.allowLinkStealing=allowLinkStealing;
+    }
+
     public boolean isAuditNetworkProducers() {
         return auditNetworkProducers;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c1627c/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index e4316f2..65634b7 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -231,8 +231,18 @@ public class RegionBroker extends EmptyBroker {
         synchronized (clientIdSet) {
             ConnectionContext oldContext = clientIdSet.get(clientId);
             if (oldContext != null) {
-                throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client:
" + clientId + " already connected from "
-                    + oldContext.getConnection().getRemoteAddress());
+                if (context.isAllowLinkStealing()){
+                     clientIdSet.remove(clientId);
+                     if (oldContext.getConnection() != null) {
+                        LOG.warn("Stealing link for clientId " + clientId + " From Connection
" + oldContext.getConnection());
+                        oldContext.getConnection().stop();
+                     }else{
+                         LOG.error("Not Connection for " + oldContext);
+                     }
+                }else{
+                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " -
Client: " + clientId + " already connected from "
+                            + oldContext.getConnection().getRemoteAddress());
+                }
             } else {
                 clientIdSet.put(clientId, context);
             }


Mime
View raw message