activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r378968 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: ConduitBridge.java DemandSubscription.java DurableConduitBridge.java NetworkConnector.java
Date Sun, 19 Feb 2006 22:14:35 GMT
Author: rajdavies
Date: Sun Feb 19 14:14:33 2006
New Revision: 378968

URL: http://svn.apache.org/viewcvs?rev=378968&view=rev
Log:
fix some issues with assembly tests

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Sun Feb 19 14:14:33 2006
@@ -45,6 +45,14 @@
     }
     
     protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+        
+        if (addToAlreadyInterestedConsumers(info)){
+            return null; //don't want this subscription added
+        }
+        return doCreateDemandSubscription(info);
+    }
+    
+    protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){
         //search through existing subscriptions and see if we have a match
         boolean matched = false;
         DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination());
@@ -57,18 +65,7 @@
                 //continue - we want interest to any existing DemandSubscriptions
             }
         }
-        if (matched){
-            return null; //don't want this subscription added
-        }
-        //not matched so create a new one
-        //but first, if it's durable - changed set the
-        //ConsumerId here - so it won't be removed if the
-        //durable subscriber goes away on the other end
-        if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())){
-            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
-                            .getNextSequenceId()));
-        }
-        return super.createDemandSubscription(info);
+        return matched;
     }
     
     protected void removeDemandSubscription(ConsumerId id) throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Sun Feb 19 14:14:33 2006
@@ -40,7 +40,6 @@
         remoteInfo=info;
         localInfo=info.copy();
         localInfo.setBrokerPath(info.getBrokerPath());
-        localInfo.setNetworkSubscription(true);
         remoteSubsIds.add(info.getConsumerId());
     } 
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Sun Feb 19 14:14:33 2006
@@ -1,28 +1,27 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.network;
 
 import java.io.IOException;
+import java.util.Iterator;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.Transport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-
 /**
  * Consolidates subscriptions
  * 
@@ -30,29 +29,37 @@
  */
 public class DurableConduitBridge extends ConduitBridge{
     static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
+
     /**
      * Constructor
+     * 
      * @param localBroker
      * @param remoteBroker
      */
     public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
         super(localBroker,remoteBroker);
     }
-    
+
     /**
      * Subscriptions for these desitnations are always created
-     * @throws IOException 
-     *
+     * 
      */
-    protected void setupStaticDestinations() throws IOException{
+    protected void setupStaticDestinations(){
         super.setupStaticDestinations();
         ActiveMQDestination[] dests=durableDestinations;
         if(dests!=null){
             for(int i=0;i<dests.length;i++){
                 ActiveMQDestination dest=dests[i];
-                if(isPermissableDestination(dest)){
+                if(isPermissableDestination(dest) && !doesConsumerExist(dest)){
                     DemandSubscription sub=createDemandSubscription(dest);
-                    addSubscription(sub);
+                    if(dest.isTopic()){
+                        sub.getLocalInfo().setSubcriptionName(getLocalBrokerName());
+                    }
+                    try{
+                        addSubscription(sub);
+                    }catch(IOException e){
+                        log.error("Failed to add static destination "+dest,e);
+                    }
                     if(log.isTraceEnabled())
                         log.trace("Forwarding messages for durable destination: "+dest);
                 }
@@ -60,4 +67,32 @@
         }
     }
 
+    protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+        if(addToAlreadyInterestedConsumers(info)){
+            return null; // don't want this subscription added
+        }
+        // not matched so create a new one
+        // but first, if it's durable - changed set the
+        // ConsumerId here - so it won't be removed if the
+        // durable subscriber goes away on the other end
+        if(info.isDurable()||(info.getDestination().isQueue()&&!info.getDestination().isTemporary())){
+            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator.getNextSequenceId()));
+        }
+        if(info.isDurable()){
+            // set the subscriber name to something reproducable
+            info.setSubcriptionName(getLocalBrokerName());
+        }
+        return doCreateDemandSubscription(info);
+    }
+
+    protected boolean doesConsumerExist(ActiveMQDestination dest){
+        DestinationFilter filter=DestinationFilter.parseFilter(dest);
+        for(Iterator i=subscriptionMapByLocalId.values().iterator();i.hasNext();){
+            DemandSubscription ds=(DemandSubscription) i.next();
+            if(filter.matches(ds.getLocalInfo().getDestination())){
+                return true;
+            }
+        }
+        return false;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=378968&r1=378967&r2=378968&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Sun Feb 19 14:14:33 2006
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.activemq.Service;
@@ -82,6 +83,10 @@
 
     public void stop() throws Exception {
         this.discoveryAgent.stop();
+        for (Iterator i = bridges.values().iterator();i.hasNext();){
+            Bridge bridge = (Bridge)i.next();
+            bridge.stop();
+        }
     }
 
     public void onServiceAdd(DiscoveryEvent event) {



Mime
View raw message