activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r378145 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: DemandForwardingBridge.java NetworkConnector.java
Date Thu, 16 Feb 2006 02:49:50 GMT
Author: chirino
Date: Wed Feb 15 18:49:48 2006
New Revision: 378145

URL: http://svn.apache.org/viewcvs?rev=378145&view=rev
Log:
http://jira.activemq.org/jira/browse/AMQ-505


Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378145&r1=378144&r2=378145&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Wed Feb 15 18:49:48 2006
@@ -81,6 +81,7 @@
     BrokerId localBrokerId;
     BrokerId remoteBrokerId;
     private Object brokerInfoMutex = new Object();
+    
     private static class DemandSubscription{
         ConsumerInfo remoteInfo;
         ConsumerInfo localInfo;
@@ -91,11 +92,13 @@
             localInfo=info.copy();
         }
     }
+    
     ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
     ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
     protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
     protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
     private CountDownLatch startedLatch = new CountDownLatch(2);
+    private boolean decreaseNetowrkConsumerPriority;
 
     public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
         this.localBroker=localBroker;
@@ -289,12 +292,16 @@
                             .getNextSequenceId()));
             sub.localInfo.setDispatchAsync(dispatchAsync);
             sub.localInfo.setPrefetchSize(prefetchSize);
-            byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
-            if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
-                // The longer the path to the consumer, the less it's consumer priority.
-                priority-=info.getBrokerPath().length+1;
+            
+            if( decreaseNetowrkConsumerPriority ) {
+                byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+                if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
+                    // The longer the path to the consumer, the less it's consumer priority.
+                    priority-=info.getBrokerPath().length+1;
+                }
+                sub.localInfo.setPriority(priority);
             }
-            sub.localInfo.setPriority(priority);
+            
             subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
             subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
             sub.localInfo.setBrokerPath(info.getBrokerPath());
@@ -472,5 +479,13 @@
     
     private void waitStarted() throws InterruptedException {
         startedLatch.await();
+    }
+
+    public boolean isDecreaseNetowrkConsumerPriority() {
+        return decreaseNetowrkConsumerPriority;
+    }
+
+    public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority)
{
+        this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=378145&r1=378144&r2=378145&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Wed Feb 15 18:49:48 2006
@@ -22,7 +22,6 @@
 import java.util.Set;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -49,8 +48,8 @@
 
     private ConcurrentHashMap bridges = new ConcurrentHashMap();
     private Set durableDestinations;
-    boolean failover=true;
-    
+    private boolean failover=true;
+    private boolean decreaseNetowrkConsumerPriority;
     
     public NetworkConnector(){
         
@@ -196,6 +195,7 @@
                 }
             }
         };
+        result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority());
         result.setLocalBrokerName(brokerName);
         return result;
     }
@@ -240,6 +240,16 @@
      */
     public void setDurableDestinations(Set durableDestinations){
         this.durableDestinations=durableDestinations;
+    }
+
+
+    public boolean isDecreaseNetowrkConsumerPriority() {
+        return decreaseNetowrkConsumerPriority;
+    }
+
+
+    public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority)
{
+        this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
     }
 
 }



Mime
View raw message