activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r378700 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: ConduitBridge.java DemandForwardingBridge.java DemandSubscription.java DurableConduitBridge.java ForwardingBridge.java NetworkConnector.java
Date Sat, 18 Feb 2006 07:20:18 GMT
Author: rajdavies
Date: Fri Feb 17 23:20:16 2006
New Revision: 378700

URL: http://svn.apache.org/viewcvs?rev=378700&view=rev
Log:
Add more options for networks:

included, excluded destinationm filters
durable destinations as well as dynamic
conduit subscriptions (multiple subs on smae matching destination are treated as one)
networkTTL = number of hops messages/subs can pass through - default = 1

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=378700&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Fri Feb 17 23:20:16 2006
@@ -0,0 +1,94 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.transport.Transport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Consolidates subscriptions
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class ConduitBridge extends DemandForwardingBridge{
+    static final private Log log=LogFactory.getLog(ConduitBridge.class);
+    /**
+     * Constructor
+     * @param localBroker
+     * @param remoteBroker
+     */
+    public ConduitBridge(Transport localBroker,Transport remoteBroker){
+        super(localBroker,remoteBroker);
+    }
+    
+    protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+        //search through existing subscriptions and see if we have a match
+        boolean matched = false;
+        DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination());
+        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
+            DemandSubscription ds = (DemandSubscription)i.next();
+            if (filter.matches(ds.getLocalInfo().getDestination())){
+                //add the interest in the subscription
+                ds.add(ds.getRemoteInfo().getConsumerId());
+                matched = true;
+                //continue - we want interest to any existing DemandSubscriptions
+            }
+        }
+        if (matched){
+            return null; //don't want this subscription added
+        }
+        //not matched so create a new one
+        //but first, if it's durable - changed set the
+        //ConsumerId here - so it won't be removed if the
+        //durable subscriber goes away on the other end
+        if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())){
+            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+                            .getNextSequenceId()));
+        }
+        return super.createDemandSubscription(info);
+    }
+    
+    protected void removeDemandSubscription(ConsumerId id) throws IOException{
+        List tmpList = new ArrayList();
+    
+        for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){
+            DemandSubscription ds = (DemandSubscription)i.next();
+            ds.remove(id);
+            if (ds.isEmpty()){
+                tmpList.add(ds);
+            }
+        }
+        for (Iterator i = tmpList.iterator(); i.hasNext();){
+            DemandSubscription ds = (DemandSubscription) i.next();
+            subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
+            removeSubscription(ds);
+            if(log.isTraceEnabled())
+                log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" :  "+ds.getRemoteInfo());
+        }
+       
+    }
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378700&r1=378699&r2=378700&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Fri Feb 17 23:20:16 2006
@@ -16,6 +16,7 @@
 import java.io.IOException;
 import javax.jms.JMSException;
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -37,9 +38,10 @@
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -59,46 +61,41 @@
  */
 public class DemandForwardingBridge implements Bridge{
     static final private Log log=LogFactory.getLog(DemandForwardingBridge.class);
-    private final Transport localBroker;
-    private final Transport remoteBroker;
-    private IdGenerator idGenerator=new IdGenerator();
-    private LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator();
-    private ConnectionInfo localConnectionInfo;
-    private ConnectionInfo remoteConnectionInfo;
-    private SessionInfo localSessionInfo;
-    private ProducerInfo producerInfo;
-    private String localBrokerName;
-    private String remoteBrokerName;
-    private String localClientId;
-    private int prefetchSize=1000;
-    private boolean dispatchAsync;
-    private String destinationFilter=">";
-    private ConsumerInfo demandConsumerInfo;
-    private int demandConsumerDispatched;
-    private AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
-    private AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
-    private boolean disposed=false;
-    BrokerId localBrokerId;
-    BrokerId remoteBrokerId;
-    private Object brokerInfoMutex = new Object();
-    
-    private static class DemandSubscription{
-        ConsumerInfo remoteInfo;
-        ConsumerInfo localInfo;
-        int dispatched;
-
-        public DemandSubscription(ConsumerInfo info){
-            remoteInfo=info;
-            localInfo=info.copy();
-        }
-    }
-    
-    ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
-    ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
+    protected final Transport localBroker;
+    protected final Transport remoteBroker;
+    protected IdGenerator idGenerator=new IdGenerator();
+    protected LongSequenceGenerator consumerIdGenerator=new LongSequenceGenerator();
+    protected ConnectionInfo localConnectionInfo;
+    protected ConnectionInfo remoteConnectionInfo;
+    protected SessionInfo localSessionInfo;
+    protected ProducerInfo producerInfo;
+    protected String localBrokerName;
+    protected String remoteBrokerName;
+    protected String localClientId;
+    protected int prefetchSize=1000;
+    protected boolean dispatchAsync;
+    protected String destinationFilter=">";
+    protected ConsumerInfo demandConsumerInfo;
+    protected int demandConsumerDispatched;
+    protected AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
+    protected AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
+    protected boolean disposed=false;
+    protected BrokerId localBrokerId;
+    protected BrokerId remoteBrokerId;
+    protected ActiveMQDestination[] excludedDestinations;
+    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
+    protected ActiveMQDestination[] staticallyIncludedDestinations;
+    protected ActiveMQDestination[] durableDestinations;   
+    protected ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
+    protected ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
     protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
     protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
-    private CountDownLatch startedLatch = new CountDownLatch(2);
-    private boolean decreaseNetowrkConsumerPriority;
+    protected CountDownLatch startedLatch = new CountDownLatch(2);
+    protected Object brokerInfoMutex = new Object();
+    protected boolean decreaseNetworkConsumerPriority;
+    protected int networkTTL = 1;
+    
+    
 
     public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
         this.localBroker=localBroker;
@@ -107,7 +104,7 @@
 
     public void start() throws Exception{
         log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+"
has been established.");
-        localBroker.setTransportListener(new TransportListener(){
+        localBroker.setTransportListener(new DefaultTransportListener(){
             public void onCommand(Command command){
                 serviceLocalCommand(command);
             }
@@ -116,7 +113,7 @@
                 serviceLocalException(error);
             }
         });
-        remoteBroker.setTransportListener(new TransportListener(){
+        remoteBroker.setTransportListener(new DefaultTransportListener(){
             public void onCommand(Command command){
                 serviceRemoteCommand(command);
             }
@@ -168,6 +165,7 @@
             log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
                             +") has been established.");
             startedLatch.countDown();
+            setupStaticDestinations();
         }
     }
 
@@ -195,7 +193,13 @@
             startedLatch.countDown();
         }
     }
+    
+   
 
+    /**
+     * stop the bridge
+     * @throws Exception 
+     */
     public void stop() throws Exception{
         if(!disposed){
             try{
@@ -274,62 +278,39 @@
             // Create a new local subscription
             ConsumerInfo info=(ConsumerInfo) data;
             BrokerId[] path=info.getBrokerPath();
-            if((path!=null&&path.length>0)||info.isNetworkSubscription()){
-                // Ignore: We only support directly connected brokers for now.
+            if((path!=null&&path.length>= networkTTL)){
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring Subscription " + info + " restricted to " + networkTTL
+ " network hops only");
                 return;
             }
             if(contains(info.getBrokerPath(),localBrokerPath[0])){
                 // Ignore this consumer as it's a consumer we locally sent to the broker.
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring sub " + info + " already routed through this broker
once");
+                return;
+            }
+            if (!isPermissableDestination(info.getDestination())){
+                //ignore if not in the permited or in the excluded list
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring sub " + info + " destination " + info.getDestination()
+ " is not permiited");
                 return;
             }
-            if(log.isTraceEnabled())
-                log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+" :
 "+info);
             // Update the packet to show where it came from.
             info=info.copy();
             info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
-            DemandSubscription sub=new DemandSubscription(info);
-            sub.localInfo.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
-                            .getNextSequenceId()));
-            sub.localInfo.setDispatchAsync(dispatchAsync);
-            sub.localInfo.setPrefetchSize(prefetchSize);
-            
-            if( decreaseNetowrkConsumerPriority ) {
-                byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
-                if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
-                    // The longer the path to the consumer, the less it's consumer priority.
-                    priority-=info.getBrokerPath().length+1;
-                }
-                sub.localInfo.setPriority(priority);
+            DemandSubscription sub=createDemandSubscription(info);
+            if (sub != null){
+                addSubscription(sub);
+                if(log.isTraceEnabled())
+                    log.trace("Forwarding sub on "+localBroker+" from "+remoteBrokerName+"
:  "+info);
+            }else {
+                if(log.isTraceEnabled())
+                    log.trace("Ignoring sub " + info + " already subscribed to matching destination");
             }
-            
-            subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
-            subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
-            sub.localInfo.setBrokerPath(info.getBrokerPath());
-            sub.localInfo.setNetworkSubscription(true);
-            // This works for now since we use a VM connection to the local broker.
-            // may need to change if we ever subscribe to a remote broker.
-            sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
-                public boolean matches(MessageEvaluationContext message) throws JMSException{
-                    try{
-                        return matchesForwardingFilter(message.getMessage());
-                    }catch(IOException e){
-                        throw JMSExceptionSupport.create(e);
-                    }
-                }
-
-                public Object evaluate(MessageEvaluationContext message) throws JMSException{
-                    return matches(message)?Boolean.TRUE:Boolean.FALSE;
-                }
-            });
-            localBroker.oneway(sub.localInfo);
         }
         if(data.getClass()==RemoveInfo.class){
             ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
-            DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id);
-            if(sub!=null){
-                subscriptionMapByLocalId.remove(sub.localInfo.getConsumerId());
-                localBroker.oneway(sub.localInfo.createRemoveCommand());
-            }
+            removeDemandSubscription(id);
         }
     }
 
@@ -337,20 +318,39 @@
         log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown:
"+error.getMessage(),error);
         ServiceSupport.dispose(this);
     }
-
-    boolean matchesForwardingFilter(Message message){
-        if(message.isRecievedByDFBridge()||contains(message.getBrokerPath(),remoteBrokerPath[0]))
-            return false;
-        // Don't propagate advisory messages about network subscriptions
-        if(message.isAdvisory()&&message.getDataStructure()!=null
-                        &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
-            ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
-            if(info.isNetworkSubscription()){
-                return false;
-            }
+    
+    protected void addSubscription(DemandSubscription sub) throws IOException{
+        if (sub != null){
+            localBroker.oneway(sub.getLocalInfo());
         }
-        return true;
     }
+    
+    protected void removeSubscription(DemandSubscription sub) throws IOException{
+        if(sub!=null){
+            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+        }
+    }
+    
+    protected DemandSubscription getDemandSubscription(MessageDispatch md){
+        return (DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
+    }
+    
+    protected Message configureMessage(MessageDispatch md){
+        Message message=md.getMessage().copy();
+        // Update the packet to show where it came from.
+        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
+        message.setProducerId(producerInfo.getProducerId());
+        message.setDestination(md.getDestination());
+        if(message.getOriginalTransactionId()==null)
+            message.setOriginalTransactionId(message.getTransactionId());
+        message.setTransactionId(null);
+        message.setRecievedByDFBridge(true);
+        message.evictMarshlledForm();
+        return message;
+    }
+
+    
 
     protected void serviceLocalCommand(Command command){
         if(!disposed){
@@ -359,22 +359,12 @@
                 if(command.isMessageDispatch()){
                     waitStarted();
                     MessageDispatch md=(MessageDispatch) command;
-                    Message message=md.getMessage();
                     DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
                     if(sub!=null){
-                        message=message.copy();
-                        // Update the packet to show where it came from.
-                        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
-                        message.setProducerId(producerInfo.getProducerId());
-                        message.setDestination(md.getDestination());
-                        if(message.getOriginalTransactionId()==null)
-                            message.setOriginalTransactionId(message.getTransactionId());
-                        message.setTransactionId(null);
-                        message.setRecievedByDFBridge(true);
-                        message.evictMarshlledForm();
+                        Message message= configureMessage(md);
                         if(trace)
                             log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+":
"+message);
-                        if(!message.isPersistent()||!sub.remoteInfo.isDurable()){
+                        if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){
                             remoteBroker.oneway(message);
                         }else{
                             Response response=remoteBroker.request(message);
@@ -383,10 +373,10 @@
                                 serviceLocalException(er.getException());
                             }
                         }
-                        sub.dispatched++;
-                        if(sub.dispatched>(sub.localInfo.getPrefetchSize()*.75)){
-                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,sub.dispatched));
-                            sub.dispatched=0;
+                        int dispatched = sub.incrementDispatched();
+                        if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
+                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
+                            sub.setDispatched(0);
                         }
                     }
                 }else if(command.isBrokerInfo()){
@@ -419,28 +409,91 @@
         }
     }
 
+    /**
+     * @return prefetch size
+     */
     public int getPrefetchSize(){
         return prefetchSize;
     }
 
+    
+    /**
+     * @param prefetchSize
+     */
     public void setPrefetchSize(int prefetchSize){
         this.prefetchSize=prefetchSize;
     }
 
+    
+    /**
+     * @return true if dispatch async
+     */
     public boolean isDispatchAsync(){
         return dispatchAsync;
     }
 
+    /**
+     * @param dispatchAsync
+     */
     public void setDispatchAsync(boolean dispatchAsync){
         this.dispatchAsync=dispatchAsync;
     }
+    
+    /**
+     * @return Returns the dynamicallyIncludedDestinations.
+     */
+    public ActiveMQDestination[] getDynamicallyIncludedDestinations(){
+        return dynamicallyIncludedDestinations;
+    }
+
+    /**
+     * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
+     */
+    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations){
+        this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
+    }
+
+    /**
+     * @return Returns the excludedDestinations.
+     */
+    public ActiveMQDestination[] getExcludedDestinations(){
+        return excludedDestinations;
+    }
+
+    /**
+     * @param excludedDestinations The excludedDestinations to set.
+     */
+    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations){
+        this.excludedDestinations=excludedDestinations;
+    }
 
-    public String getDestinationFilter(){
-        return destinationFilter;
+    /**
+     * @return Returns the staticallyIncludedDestinations.
+     */
+    public ActiveMQDestination[] getStaticallyIncludedDestinations(){
+        return staticallyIncludedDestinations;
     }
 
-    public void setDestinationFilter(String destinationFilter){
-        this.destinationFilter=destinationFilter;
+    /**
+     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
+     */
+    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations){
+        this.staticallyIncludedDestinations=staticallyIncludedDestinations;
+    }
+    
+
+    /**
+     * @return Returns the durableDestinations.
+     */
+    public ActiveMQDestination[] getDurableDestinations(){
+        return durableDestinations;
+    }
+
+    /**
+     * @param durableDestinations The durableDestinations to set.
+     */
+    public void setDurableDestinations(ActiveMQDestination[] durableDestinations){
+        this.durableDestinations=durableDestinations;
     }
 
     /**
@@ -457,6 +510,64 @@
     public void setLocalBrokerName(String localBrokerName){
         this.localBrokerName=localBrokerName;
     }
+    
+    /**
+     * @return Returns the localBroker.
+     */
+    public Transport getLocalBroker(){
+        return localBroker;
+    }
+
+    /**
+     * @return Returns the remoteBroker.
+     */
+    public Transport getRemoteBroker(){
+        return remoteBroker;
+    }
+
+
+    /**
+     * @return Returns the remoteBrokerName.
+     */
+    public String getRemoteBrokerName(){
+        return remoteBrokerName;
+    }
+
+    /**
+     * @param remoteBrokerName The remoteBrokerName to set.
+     */
+    public void setRemoteBrokerName(String remoteBrokerName){
+        this.remoteBrokerName=remoteBrokerName;
+    }
+    
+    /**
+     * @return Returns the decreaseNetworkConsumerPriority.
+     */
+    public boolean isDecreaseNetworkConsumerPriority(){
+        return decreaseNetworkConsumerPriority;
+    }
+
+    /**
+     * @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
+     */
+    public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
+        this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
+    }
+    
+    /**
+     * @return Returns the networkTTL.
+     */
+    public int getNetworkTTL(){
+        return networkTTL;
+    }
+
+    /**
+     * @param networkTTL The networkTTL to set.
+     */
+    public void setNetworkTTL(int networkTTL){
+        this.networkTTL=networkTTL;
+    }
+
 
     private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
         if(brokerPath!=null){
@@ -477,15 +588,143 @@
         return rc;
     }
     
-    private void waitStarted() throws InterruptedException {
-        startedLatch.await();
+    
+    protected boolean isPermissableDestination(ActiveMQDestination destination){
+        DestinationFilter filter=DestinationFilter.parseFilter(destination);
+        ActiveMQDestination[] dests = excludedDestinations;
+        if(dests!=null&&dests.length>0){
+            for(int i=0;i<dests.length;i++){
+                ActiveMQDestination match=dests[i];
+                if(match!=null&&filter.matches(match)){
+                    return false;
+                }
+            }
+        }
+        dests = dynamicallyIncludedDestinations;
+        if(dests!=null&&dests.length>0){
+            for(int i=0;i<dests.length;i++){
+                ActiveMQDestination match=dests[i];
+                if(match!=null&&filter.matches(match)){
+                    return true;
+                }
+            }
+            return false;
+        }
+       
+        return true;
     }
-
-    public boolean isDecreaseNetowrkConsumerPriority() {
-        return decreaseNetowrkConsumerPriority;
+    
+    /**
+     * Subscriptions for these desitnations are always created
+     * @throws IOException 
+     *
+     */
+    protected void setupStaticDestinations() throws IOException{
+        ActiveMQDestination[] dests = staticallyIncludedDestinations;
+        if (dests != null){
+            for(int i=0;i<dests.length;i++){
+                ActiveMQDestination dest=dests[i];
+                DemandSubscription sub = createDemandSubscription(dest);
+                addSubscription(sub);
+                if(log.isTraceEnabled())
+                    log.trace("Forwarding messages for static destination: " + dest);
+            } 
+        }
     }
+    
+    protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+        DemandSubscription result=new DemandSubscription(info);
+        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+                        .getNextSequenceId()));
+        
+        if( decreaseNetworkConsumerPriority ) {
+            byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+            if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
+                // The longer the path to the consumer, the less it's consumer priority.
+                priority-=info.getBrokerPath().length+1;
+            }
+            result.getLocalInfo().setPriority(priority);
+        }
+        configureDemandSubscription(result);
+        return result;
+    }
+    
+    protected DemandSubscription createDemandSubscription(ActiveMQDestination destination){
+        ConsumerInfo info = new ConsumerInfo();
+        info.setDestination(destination);
+        //the remote info held by the DemandSubscription holds the original consumerId,
+        //the local info get's overwritten
+        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+                        .getNextSequenceId()));
+        DemandSubscription result=new DemandSubscription(info);
+        result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
+        
+        return result;
+    }
+    
+    protected void configureDemandSubscription(DemandSubscription sub){
+        sub.getLocalInfo().setDispatchAsync(dispatchAsync);
+        sub.getLocalInfo().setPrefetchSize(prefetchSize);
+        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
+        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
+       
+        // This works for now since we use a VM connection to the local broker.
+        // may need to change if we ever subscribe to a remote broker.
+        sub.getLocalInfo().setAdditionalPredicate(new BooleanExpression(){
+            public boolean matches(MessageEvaluationContext message) throws JMSException{
+                try{
+                    return matchesForwardingFilter(message.getMessage());
+                }catch(IOException e){
+                    throw JMSExceptionSupport.create(e);
+                }
+            }
 
-    public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority)
{
-        this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
+            public Object evaluate(MessageEvaluationContext message) throws JMSException{
+                return matches(message)?Boolean.TRUE:Boolean.FALSE;
+            }
+        });
+    }
+    
+    protected void  removeDemandSubscription(ConsumerId id) throws IOException{
+        DemandSubscription sub=(DemandSubscription) subscriptionMapByRemoteId.remove(id);
+        if (sub != null){
+            removeSubscription(sub);
+            if(log.isTraceEnabled())
+                log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" :  "+sub.getRemoteInfo());
+        }
+    }
+    
+    protected boolean matchesForwardingFilter(Message message){
+        if (contains(message.getBrokerPath(),remoteBrokerPath[0])){
+            if (log.isTraceEnabled()){
+                log.trace("Message all ready routed once through this broker - ignoring:
" + message);
+            }
+        }
+        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
+        if(hops >= networkTTL){
+            if (log.isTraceEnabled()){
+                log.trace("Message restricted to " + networkTTL + " network hops ignoring:
" + message);
+            }
+            return false;
+        }
+        // Don't propagate advisory messages about network subscriptions
+        if(message.isAdvisory()&&message.getDataStructure()!=null
+                        &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
+            ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
+            if(info.isNetworkSubscription()){
+                return false;
+            }
+        }
+        return true;
     }
+    
+    protected void waitStarted() throws InterruptedException {
+        startedLatch.await();
+    }
+
+    
+
+    
+
+    
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=378700&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Fri Feb 17 23:20:16 2006
@@ -0,0 +1,122 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.Set;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+
+
+/**
+ * Represents a network bridge interface
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class DemandSubscription{
+    private ConsumerInfo remoteInfo;
+    private ConsumerInfo localInfo;
+    private Set remoteSubsIds = new CopyOnWriteArraySet();
+    private AtomicInteger dispatched = new AtomicInteger(0);
+
+    DemandSubscription(ConsumerInfo info){
+        remoteInfo=info;
+        localInfo=info.copy();
+        localInfo.setBrokerPath(info.getBrokerPath());
+        localInfo.setNetworkSubscription(true);
+        remoteSubsIds.add(info.getConsumerId());
+    } 
+
+    /**
+     * Increment the consumers associated with this subscription
+     * @param id
+     * @return true if added
+     */
+    public boolean add(ConsumerId id){
+        return remoteSubsIds.add(id);
+    }
+    
+    /**
+     * Increment the consumers associated with this subscription
+     * @param id
+     * @return true if added
+     */
+    public boolean remove(ConsumerId id){
+        return remoteSubsIds.remove(id);
+    }
+    
+    /**
+     * @return true if there are no interested consumers
+     */
+    public boolean isEmpty(){
+        return remoteSubsIds.isEmpty();
+    }
+    
+    
+    /**
+     * @return Returns the dispatched.
+     */
+    public int getDispatched(){
+        return dispatched.get();
+    }
+
+    /**
+     * @param dispatched The dispatched to set.
+     */
+    public void setDispatched(int dispatched){
+        this.dispatched.set(dispatched);
+    }
+    
+    /**
+     * @return dispatched count after incremented
+     */
+    public int incrementDispatched(){
+        return dispatched.incrementAndGet();
+    }
+
+    /**
+     * @return Returns the localInfo.
+     */
+    public ConsumerInfo getLocalInfo(){
+        return localInfo;
+    }
+
+    /**
+     * @param localInfo The localInfo to set.
+     */
+    public void setLocalInfo(ConsumerInfo localInfo){
+        this.localInfo=localInfo;
+    }
+
+    /**
+     * @return Returns the remoteInfo.
+     */
+    public ConsumerInfo getRemoteInfo(){
+        return remoteInfo;
+    }
+
+    /**
+     * @param remoteInfo The remoteInfo to set.
+     */
+    public void setRemoteInfo(ConsumerInfo remoteInfo){
+        this.remoteInfo=remoteInfo;
+    }
+    
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=378700&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Fri Feb 17 23:20:16 2006
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.Transport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Consolidates subscriptions
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class DurableConduitBridge extends ConduitBridge{
+    static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
+    /**
+     * Constructor
+     * @param localBroker
+     * @param remoteBroker
+     */
+    public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
+        super(localBroker,remoteBroker);
+    }
+    
+    /**
+     * Subscriptions for these desitnations are always created
+     * @throws IOException 
+     *
+     */
+    protected void setupStaticDestinations() throws IOException{
+        super.setupStaticDestinations();
+        ActiveMQDestination[] dests=durableDestinations;
+        if(dests!=null){
+            for(int i=0;i<dests.length;i++){
+                ActiveMQDestination dest=dests[i];
+                if(isPermissableDestination(dest)){
+                    DemandSubscription sub=createDemandSubscription(dest);
+                    addSubscription(sub);
+                    if(log.isTraceEnabled())
+                        log.trace("Forwarding messages for durable destination: "+dest);
+                }
+            }
+        }
+    }
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=378700&r1=378699&r2=378700&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
Fri Feb 17 23:20:16 2006
@@ -32,6 +32,7 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
@@ -80,7 +81,7 @@
     public void start() throws Exception {
         log.info("Starting a network connection between " + localBroker + " and " + remoteBroker
+ " has been established.");
 
-        localBroker.setTransportListener(new TransportListener(){
+        localBroker.setTransportListener(new DefaultTransportListener(){
             public void onCommand(Command command) {
                 serviceLocalCommand(command);
             }
@@ -89,7 +90,7 @@
             }
         });
         
-        remoteBroker.setTransportListener(new TransportListener(){
+        remoteBroker.setTransportListener(new DefaultTransportListener(){
             public void onCommand(Command command) {
                 serviceRemoteCommand(command);
             }
@@ -192,7 +193,7 @@
                     }
                 }
             } else {
-                System.out.println("Unexpected remote command: "+command);
+                log.warn("Unexpected remote command: "+command);
             }
         } catch (IOException e) {
             serviceLocalException(e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=378700&r1=378699&r2=378700&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Fri Feb 17 23:20:16 2006
@@ -22,6 +22,8 @@
 import java.util.Set;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -49,7 +51,14 @@
     private ConcurrentHashMap bridges = new ConcurrentHashMap();
     private Set durableDestinations;
     private boolean failover=true;
-    private boolean decreaseNetowrkConsumerPriority;
+    private ActiveMQDestination[] excludedDestinations;
+    private ActiveMQDestination[] dynamicallyIncludedDestinations;
+    private ActiveMQDestination[] staticallyIncludedDestinations;
+    private boolean dynamicOnly = false;
+    private boolean conduitSubscriptions = true;
+    private boolean decreaseNetworkConsumerPriority;
+    private int networkTTL = 1;
+    
     
     public NetworkConnector(){
         
@@ -182,24 +191,6 @@
         setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
     }    
 
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final
DiscoveryEvent event) {
-        DemandForwardingBridge result =  new DemandForwardingBridge(localTransport, remoteTransport)
{
-            protected void serviceRemoteException(IOException error) {
-                super.serviceRemoteException(error);
-                try {
-                    // Notify the discovery agent that the remote broker failed.
-                    discoveryAgent.serviceFailed(event);
-                } catch (IOException e) {
-                }
-            }
-        };
-        result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority());
-        result.setLocalBrokerName(brokerName);
-        return result;
-    }
-
     
 
     public boolean isFailover() {
@@ -243,13 +234,167 @@
     }
 
 
-    public boolean isDecreaseNetowrkConsumerPriority() {
-        return decreaseNetowrkConsumerPriority;
+    /**
+     * @return Returns the dynamicallyIncludedDestinations.
+     */
+    public ActiveMQDestination[] getDynamicallyIncludedDestinations(){
+        return dynamicallyIncludedDestinations;
+    }
+
+
+    /**
+     * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
+     */
+    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations){
+        this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
+    }
+
+
+    /**
+     * @return Returns the dynamicOnly.
+     */
+    public boolean isDynamicOnly(){
+        return dynamicOnly;
+    }
+
+
+    /**
+     * @param dynamicOnly The dynamicOnly to set.
+     */
+    public void setDynamicOnly(boolean dynamicOnly){
+        this.dynamicOnly=dynamicOnly;
+    }
+    
+    /**
+     * @return Returns the conduitSubscriptions.
+     */
+    public boolean isConduitSubscriptions(){
+        return conduitSubscriptions;
+    }
+
+
+    /**
+     * @param conduitSubscriptions The conduitSubscriptions to set.
+     */
+    public void setConduitSubscriptions(boolean conduitSubscriptions){
+        this.conduitSubscriptions=conduitSubscriptions;
+    }
+    
+    /**
+     * @return Returns the decreaseNetworkConsumerPriority.
+     */
+    public boolean isDecreaseNetworkConsumerPriority(){
+        return decreaseNetworkConsumerPriority;
+    }
+
+    /**
+     * @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
+     */
+    public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
+        this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
+    }
+    
+    /**
+     * @return Returns the networkTTL.
+     */
+    public int getNetworkTTL(){
+        return networkTTL;
+    }
+
+    /**
+     * @param networkTTL The networkTTL to set.
+     */
+    public void setNetworkTTL(int networkTTL){
+        this.networkTTL=networkTTL;
+    }
+
+
+    /**
+     * @return Returns the excludedDestinations.
+     */
+    public ActiveMQDestination[] getExcludedDestinations(){
+        return excludedDestinations;
+    }
+
+
+    /**
+     * @param excludedDestinations The excludedDestinations to set.
+     */
+    public void setExcludedDestinations(ActiveMQDestination[] exludedDestinations){
+        this.excludedDestinations=exludedDestinations;
+    }
+
+
+    /**
+     * @return Returns the staticallyIncludedDestinations.
+     */
+    public ActiveMQDestination[] getStaticallyIncludedDestinations(){
+        return staticallyIncludedDestinations;
     }
 
 
-    public void setDecreaseNetowrkConsumerPriority(boolean decreaseNetowrkConsumerPriority)
{
-        this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
+    /**
+     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
+     */
+    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations){
+        this.staticallyIncludedDestinations=staticallyIncludedDestinations;
     }
+    
+   
+
+    
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final
DiscoveryEvent event) {
+        DemandForwardingBridge result = null;
+        if (conduitSubscriptions){
+            if (dynamicOnly){
+                result = new ConduitBridge(localTransport, remoteTransport) {
+                    protected void serviceRemoteException(IOException error) {
+                        super.serviceRemoteException(error);
+                        try {
+                            // Notify the discovery agent that the remote broker failed.
+                            discoveryAgent.serviceFailed(event);
+                        } catch (IOException e) {
+                        }
+                    }
+                };
+            }else {
+                result = new DurableConduitBridge(localTransport, remoteTransport) {
+                    protected void serviceRemoteException(IOException error) {
+                        super.serviceRemoteException(error);
+                        try {
+                            // Notify the discovery agent that the remote broker failed.
+                            discoveryAgent.serviceFailed(event);
+                        } catch (IOException e) {
+                        }
+                    }
+                };
+            }
+        }else {
+         result = new DemandForwardingBridge(localTransport, remoteTransport) {
+            protected void serviceRemoteException(IOException error) {
+                super.serviceRemoteException(error);
+                try {
+                    // Notify the discovery agent that the remote broker failed.
+                    discoveryAgent.serviceFailed(event);
+                } catch (IOException e) {
+                }
+            }
+        };
+        }
+        result.setLocalBrokerName(brokerName);
+        result.setNetworkTTL(getNetworkTTL());
+        result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
+        result.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
+        result.setExcludedDestinations(getExcludedDestinations());
+        result.setStaticallyIncludedDestinations(getStaticallyIncludedDestinations());
+        if (durableDestinations != null){
+            ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
+            dest = (ActiveMQDestination[]) durableDestinations.toArray(dest);
+            result.setDurableDestinations(dest);
+        }
+        return result;
+    } 
 
 }



Mime
View raw message