Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Wed Aug 8 11:56:59 2007 @@ -28,70 +28,74 @@ import java.util.Iterator; import java.util.List; - /** * Consolidates subscriptions * * @version $Revision: 1.1 $ */ -public class ConduitBridge extends DemandForwardingBridge{ - static final private Log log=LogFactory.getLog(ConduitBridge.class); +public class ConduitBridge extends DemandForwardingBridge { + static final private Log log = LogFactory.getLog(ConduitBridge.class); + /** * Constructor + * * @param localBroker * @param remoteBroker */ - public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){ - super(configuration,localBroker,remoteBroker); + public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, + Transport remoteBroker) { + super(configuration, localBroker, remoteBroker); } - - protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{ - - if (addToAlreadyInterestedConsumers(info)){ - return null; //don't want this subscription added + + protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { + + if (addToAlreadyInterestedConsumers(info)) { + return null; // don't want this subscription added } return doCreateDemandSubscription(info); } - - protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info){ - - if( info.getSelector()!=null ) - return false; - - //search through existing subscriptions and see if we have a match + + protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { + + if (info.getSelector() != null) + return false; + + // search through existing subscriptions and see if we have a match boolean matched = false; - DestinationFilter filter=DestinationFilter.parseFilter(info.getDestination()); - for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){ + DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination()); + for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { DemandSubscription ds = (DemandSubscription)i.next(); - if (filter.matches(ds.getLocalInfo().getDestination())){ - //add the interest in the subscription - //ds.add(ds.getRemoteInfo().getConsumerId()); + if (filter.matches(ds.getLocalInfo().getDestination())) { + // add the interest in the subscription + // ds.add(ds.getRemoteInfo().getConsumerId()); ds.add(info.getConsumerId()); matched = true; - //continue - we want interest to any existing DemandSubscriptions + // continue - we want interest to any existing + // DemandSubscriptions } } return matched; } - - protected void removeDemandSubscription(ConsumerId id) throws IOException{ + + protected void removeDemandSubscription(ConsumerId id) throws IOException { List tmpList = new ArrayList(); - - for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();){ + + for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { DemandSubscription ds = (DemandSubscription)i.next(); ds.remove(id); - if (ds.isEmpty()){ + if (ds.isEmpty()) { tmpList.add(ds); } } - for (Iterator i = tmpList.iterator(); i.hasNext();){ - DemandSubscription ds = (DemandSubscription) i.next(); + for (Iterator i = tmpList.iterator(); i.hasNext();) { + DemandSubscription ds = (DemandSubscription)i.next(); subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId()); removeSubscription(ds); - if(log.isTraceEnabled()) - log.trace("removing sub on "+localBroker+" from "+remoteBrokerName+" : "+ds.getRemoteInfo()); + if (log.isTraceEnabled()) + log.trace("removing sub on " + localBroker + " from " + remoteBrokerName + " : " + + ds.getRemoteInfo()); } - + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Wed Aug 8 11:56:59 2007 @@ -35,41 +35,42 @@ */ public class DemandForwardingBridge extends DemandForwardingBridgeSupport { - protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null }; + protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null}; protected Object brokerInfoMutex = new Object(); protected BrokerId remoteBrokerId; - public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){ - super(configuration,localBroker, remoteBroker); + public DemandForwardingBridge(NetworkBridgeConfiguration configuration, Transport localBroker, + Transport remoteBroker) { + super(configuration, localBroker, remoteBroker); } protected void serviceRemoteBrokerInfo(Command command) throws IOException { - synchronized(brokerInfoMutex){ - BrokerInfo remoteBrokerInfo=(BrokerInfo) command; - remoteBrokerId=remoteBrokerInfo.getBrokerId(); - remoteBrokerPath[0]=remoteBrokerId; - remoteBrokerName=remoteBrokerInfo.getBrokerName(); - if(localBrokerId!=null){ - if(localBrokerId.equals(remoteBrokerId)){ + synchronized (brokerInfoMutex) { + BrokerInfo remoteBrokerInfo = (BrokerInfo)command; + remoteBrokerId = remoteBrokerInfo.getBrokerId(); + remoteBrokerPath[0] = remoteBrokerId; + remoteBrokerName = remoteBrokerInfo.getBrokerName(); + if (localBrokerId != null) { + if (localBrokerId.equals(remoteBrokerId)) { log.info("Disconnecting loop back connection."); - //waitStarted(); + // waitStarted(); ServiceSupport.dispose(this); } } - remoteBrokerNameKnownLatch.countDown(); + remoteBrokerNameKnownLatch.countDown(); } } protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) { - info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath())); + info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); } protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { - synchronized(brokerInfoMutex){ - localBrokerId=((BrokerInfo) command).getBrokerId(); - localBrokerPath[0]=localBrokerId; - if(remoteBrokerId!=null){ - if(remoteBrokerId.equals(localBrokerId)){ + synchronized (brokerInfoMutex) { + localBrokerId = ((BrokerInfo)command).getBrokerId(); + localBrokerPath[0] = localBrokerId; + if (remoteBrokerId != null) { + if (remoteBrokerId.equals(localBrokerId)) { log.info("Disconnecting loop back connection."); waitStarted(); ServiceSupport.dispose(this); @@ -77,12 +78,12 @@ } } } - + protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL()); } - - protected BrokerId[] getRemoteBrokerPath(){ + + protected BrokerId[] getRemoteBrokerPath() { return remoteBrokerPath; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java Wed Aug 8 11:56:59 2007 @@ -29,92 +29,93 @@ * * @version $Revision: 1.1 $ */ -public class DemandSubscription{ +public class DemandSubscription { private ConsumerInfo remoteInfo; private ConsumerInfo localInfo; private Set remoteSubsIds = new CopyOnWriteArraySet(); private AtomicInteger dispatched = new AtomicInteger(0); - DemandSubscription(ConsumerInfo info){ - remoteInfo=info; - localInfo=info.copy(); + DemandSubscription(ConsumerInfo info) { + remoteInfo = info; + localInfo = info.copy(); localInfo.setBrokerPath(info.getBrokerPath()); remoteSubsIds.add(info.getConsumerId()); - } + } /** * Increment the consumers associated with this subscription + * * @param id * @return true if added */ - public boolean add(ConsumerId id){ + public boolean add(ConsumerId id) { return remoteSubsIds.add(id); } - + /** * Increment the consumers associated with this subscription + * * @param id * @return true if added */ - public boolean remove(ConsumerId id){ + public boolean remove(ConsumerId id) { return remoteSubsIds.remove(id); } - + /** * @return true if there are no interested consumers */ - public boolean isEmpty(){ + public boolean isEmpty() { return remoteSubsIds.isEmpty(); } - - + /** * @return Returns the dispatched. */ - public int getDispatched(){ + public int getDispatched() { return dispatched.get(); } /** * @param dispatched The dispatched to set. */ - public void setDispatched(int dispatched){ + public void setDispatched(int dispatched) { this.dispatched.set(dispatched); } - + /** * @return dispatched count after incremented */ - public int incrementDispatched(){ + public int incrementDispatched() { return dispatched.incrementAndGet(); } /** * @return Returns the localInfo. */ - public ConsumerInfo getLocalInfo(){ + public ConsumerInfo getLocalInfo() { return localInfo; } /** * @param localInfo The localInfo to set. */ - public void setLocalInfo(ConsumerInfo localInfo){ - this.localInfo=localInfo; + public void setLocalInfo(ConsumerInfo localInfo) { + this.localInfo = localInfo; } /** * @return Returns the remoteInfo. */ - public ConsumerInfo getRemoteInfo(){ + public ConsumerInfo getRemoteInfo() { return remoteInfo; } /** * @param remoteInfo The remoteInfo to set. */ - public void setRemoteInfo(ConsumerInfo remoteInfo){ - this.remoteInfo=remoteInfo; + public void setRemoteInfo(ConsumerInfo remoteInfo) { + this.remoteInfo = remoteInfo; } - + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Aug 8 11:56:59 2007 @@ -44,7 +44,7 @@ private DiscoveryAgent discoveryAgent; private ConcurrentHashMap bridges = new ConcurrentHashMap(); - + public DiscoveryNetworkConnector() { } @@ -56,74 +56,74 @@ setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); } - public void onServiceAdd(DiscoveryEvent event){ - String localURIName=localURI.getScheme() + "://" + localURI.getHost(); + public void onServiceAdd(DiscoveryEvent event) { + String localURIName = localURI.getScheme() + "://" + localURI.getHost(); // Ignore events once we start stopping. - if(serviceSupport.isStopped()||serviceSupport.isStopping()) + if (serviceSupport.isStopped() || serviceSupport.isStopping()) return; - String url=event.getServiceName(); - if(url!=null){ + String url = event.getServiceName(); + if (url != null) { URI uri; - try{ - uri=new URI(url); - }catch(URISyntaxException e){ - log.warn("Could not connect to remote URI: "+url+" due to bad URI syntax: "+e,e); + try { + uri = new URI(url); + } catch (URISyntaxException e) { + log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); return; } // Should we try to connect to that URI? - if(bridges.containsKey(uri)||localURI.equals(uri) - ||(connectionFilter!=null&&!connectionFilter.connectTo(uri))) + if (bridges.containsKey(uri) || localURI.equals(uri) + || (connectionFilter != null && !connectionFilter.connectTo(uri))) return; - URI connectUri=uri; - log.info("Establishing network connection between from "+localURIName+" to "+connectUri); + URI connectUri = uri; + log.info("Establishing network connection between from " + localURIName + " to " + connectUri); Transport remoteTransport; - try{ - remoteTransport=TransportFactory.connect(connectUri); - }catch(Exception e){ - log.warn("Could not connect to remote URI: "+localURIName+": "+e.getMessage()); - log.debug("Connection failure exception: "+e,e); + try { + remoteTransport = TransportFactory.connect(connectUri); + } catch (Exception e) { + log.warn("Could not connect to remote URI: " + localURIName + ": " + e.getMessage()); + log.debug("Connection failure exception: " + e, e); return; } Transport localTransport; - try{ - localTransport=createLocalTransport(); - }catch(Exception e){ + try { + localTransport = createLocalTransport(); + } catch (Exception e) { ServiceSupport.dispose(remoteTransport); - log.warn("Could not connect to local URI: "+localURIName+": "+e.getMessage()); - log.debug("Connection failure exception: "+e,e); + log.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage()); + log.debug("Connection failure exception: " + e, e); return; } - NetworkBridge bridge=createBridge(localTransport,remoteTransport,event); - bridges.put(uri,bridge); - try{ + NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); + bridges.put(uri, bridge); + try { bridge.start(); - }catch(Exception e){ + } catch (Exception e) { ServiceSupport.dispose(localTransport); ServiceSupport.dispose(remoteTransport); - log.warn("Could not start network bridge between: "+localURIName+" and: "+uri+" due to: "+e); - log.debug("Start failure exception: "+e,e); - try{ + log.warn("Could not start network bridge between: " + localURIName + " and: " + uri + + " due to: " + e); + log.debug("Start failure exception: " + e, e); + try { discoveryAgent.serviceFailed(event); - }catch(IOException e1){ + } catch (IOException e1) { } return; } } } - + public void onServiceRemove(DiscoveryEvent event) { String url = event.getServiceName(); if (url != null) { URI uri; try { uri = new URI(url); - } - catch (URISyntaxException e) { + } catch (URISyntaxException e) { log.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); return; } - NetworkBridge bridge = (NetworkBridge) bridges.remove(uri); + NetworkBridge bridge = (NetworkBridge)bridges.remove(uri); if (bridge == null) return; @@ -153,55 +153,52 @@ protected void handleStop(ServiceStopper stopper) throws Exception { for (Iterator i = bridges.values().iterator(); i.hasNext();) { - NetworkBridge bridge = (NetworkBridge) i.next(); + NetworkBridge bridge = (NetworkBridge)i.next(); try { bridge.stop(); - } - catch (Exception e) { + } catch (Exception e) { stopper.onException(this, e); } } try { this.discoveryAgent.stop(); - } - catch (Exception e) { + } catch (Exception e) { stopper.onException(this, e); } super.handleStop(stopper); } - protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { + protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, + final DiscoveryEvent event) { NetworkBridgeListener listener = new NetworkBridgeListener() { - public void bridgeFailed(){ - if( !serviceSupport.isStopped() ) { + public void bridgeFailed() { + if (!serviceSupport.isStopped()) { try { discoveryAgent.serviceFailed(event); } catch (IOException e) { } } - + } - public void onStart(NetworkBridge bridge) { - registerNetworkBridgeMBean(bridge); - } + public void onStart(NetworkBridge bridge) { + registerNetworkBridgeMBean(bridge); + } - public void onStop(NetworkBridge bridge) { - unregisterNetworkBridgeMBean(bridge); - } + public void onStop(NetworkBridge bridge) { + unregisterNetworkBridgeMBean(bridge); + } - }; - DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener); + DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, + remoteTransport, listener); return configureBridge(result); } public String getName() { return discoveryAgent.toString(); } - - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Wed Aug 8 11:56:59 2007 @@ -26,81 +26,85 @@ import java.io.IOException; import java.util.Iterator; + /** * Consolidates subscriptions * * @version $Revision: 1.1 $ */ -public class DurableConduitBridge extends ConduitBridge{ - static final private Log log=LogFactory.getLog(DurableConduitBridge.class); +public class DurableConduitBridge extends ConduitBridge { + static final private Log log = LogFactory.getLog(DurableConduitBridge.class); /** * Constructor - * @param configuration + * + * @param configuration * * @param localBroker * @param remoteBroker */ - public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){ - super(configuration,localBroker,remoteBroker); + public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, + Transport remoteBroker) { + super(configuration, localBroker, remoteBroker); } /** * Subscriptions for these destinations are always created * */ - protected void setupStaticDestinations(){ + protected void setupStaticDestinations() { super.setupStaticDestinations(); - ActiveMQDestination[] dests=durableDestinations; - if(dests!=null){ - for(int i=0;i (queueConsumerInfo.getPrefetchSize()/2) ) { -// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched)); -// queueDispatched=0; -// } -// } else { -// topicDispatched++; -// if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) { -// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched)); -// topicDispatched=0; -// } -// } - } else if(command.isBrokerInfo() ) { - synchronized( this ) { - localBrokerInfo = ((BrokerInfo)command); + + // Ack on every message since we don't know if the broker is + // blocked due to memory + // usage and is waiting for an Ack to un-block him. + + // Acking a range is more efficient, but also more prone to + // locking up a server + // Perhaps doing something like the following should be policy + // based. + // if( + // md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) + // ) { + // queueDispatched++; + // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) + // ) { + // localBroker.oneway(new MessageAck(md, + // MessageAck.STANDARD_ACK_TYPE, queueDispatched)); + // queueDispatched=0; + // } + // } else { + // topicDispatched++; + // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) + // ) { + // localBroker.oneway(new MessageAck(md, + // MessageAck.STANDARD_ACK_TYPE, topicDispatched)); + // topicDispatched=0; + // } + // } + } else if (command.isBrokerInfo()) { + synchronized (this) { + localBrokerInfo = ((BrokerInfo)command); localBrokerId = localBrokerInfo.getBrokerId(); - if( remoteBrokerId !=null) { - if( remoteBrokerId.equals(localBrokerId) ) { + if (remoteBrokerId != null) { + if (remoteBrokerId.equals(localBrokerId)) { log.info("Disconnecting loop back connection."); ServiceSupport.dispose(this); } else { - triggerStartBridge(); + triggerStartBridge(); } } } } else { - log.debug("Unexpected local command: "+command); + log.debug("Unexpected local command: " + command); } } catch (IOException e) { serviceLocalException(e); @@ -307,6 +321,7 @@ public String getClientId() { return clientId; } + public void setClientId(String clientId) { this.clientId = clientId; } @@ -314,6 +329,7 @@ public int getPrefetchSize() { return prefetchSize; } + public void setPrefetchSize(int prefetchSize) { this.prefetchSize = prefetchSize; } @@ -321,6 +337,7 @@ public boolean isDispatchAsync() { return dispatchAsync; } + public void setDispatchAsync(boolean dispatchAsync) { this.dispatchAsync = dispatchAsync; } @@ -328,44 +345,44 @@ public String getDestinationFilter() { return destinationFilter; } + public void setDestinationFilter(String destinationFilter) { this.destinationFilter = destinationFilter; } - - public void setNetworkBridgeFailedListener(NetworkBridgeListener listener){ - this.bridgeFailedListener=listener; + public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) { + this.bridgeFailedListener = listener; } - + private void fireBridgeFailed() { NetworkBridgeListener l = this.bridgeFailedListener; - if (l!=null) { + if (l != null) { l.bridgeFailed(); } } - public String getRemoteAddress() { - return remoteBroker.getRemoteAddress(); - } - - public String getLocalAddress() { - return localBroker.getRemoteAddress(); - } - - public String getLocalBrokerName() { - return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); - } - - public String getRemoteBrokerName() { - return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); - } - - public long getDequeueCounter() { - return dequeueCounter.get(); - } - - public long getEnqueueCounter() { - return enqueueCounter.get(); - } + public String getRemoteAddress() { + return remoteBroker.getRemoteAddress(); + } + + public String getLocalAddress() { + return localBroker.getRemoteAddress(); + } + + public String getLocalBrokerName() { + return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); + } + + public String getRemoteBrokerName() { + return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); + } + + public long getDequeueCounter() { + return dequeueCounter.get(); + } + + public long getEnqueueCounter() { + return enqueueCounter.get(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/MulticastNetworkConnector.java Wed Aug 8 11:56:59 2007 @@ -121,24 +121,21 @@ if (bridge != null) { try { bridge.stop(); - } - catch (Exception e) { + } catch (Exception e) { stopper.onException(this, e); } } if (remoteTransport != null) { try { remoteTransport.stop(); - } - catch (Exception e) { + } catch (Exception e) { stopper.onException(this, e); } } if (localTransport != null) { try { localTransport.stop(); - } - catch (Exception e) { + } catch (Exception e) { stopper.onException(this, e); } } @@ -149,7 +146,7 @@ } protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) { - return new CompositeDemandForwardingBridge(this,local, remote); + return new CompositeDemandForwardingBridge(this, local, remote); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java Wed Aug 8 11:56:59 2007 @@ -23,48 +23,44 @@ */ public class NetworkBridgeFactory { - /** - * Create a network bridge - * - * @param config - * @param localTransport - * @param remoteTransport - * @return the NetworkBridge - */ - public static DemandForwardingBridge createBridge( - NetworkBridgeConfiguration config, Transport localTransport, - Transport remoteTransport) { - return createBridge(config, localTransport, remoteTransport, null); - } + /** + * Create a network bridge + * + * @param config + * @param localTransport + * @param remoteTransport + * @return the NetworkBridge + */ + public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config, + Transport localTransport, Transport remoteTransport) { + return createBridge(config, localTransport, remoteTransport, null); + } - /** - * create a network bridge - * - * @param configuration - * @param localTransport - * @param remoteTransport - * @param listener - * @return the NetworkBridge - */ - public static DemandForwardingBridge createBridge( - NetworkBridgeConfiguration configuration, Transport localTransport, - Transport remoteTransport, final NetworkBridgeListener listener) { - DemandForwardingBridge result = null; - if (configuration.isConduitSubscriptions()) { - if (configuration.isDynamicOnly()) { - result = new ConduitBridge(configuration, localTransport, - remoteTransport); - } else { - result = new DurableConduitBridge(configuration, - localTransport, remoteTransport); - } - } else { - result = new DemandForwardingBridge(configuration, localTransport, - remoteTransport); - } - if (listener != null) { - result.setNetworkBridgeListener(listener); - } - return result; - } + /** + * create a network bridge + * + * @param configuration + * @param localTransport + * @param remoteTransport + * @param listener + * @return the NetworkBridge + */ + public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration, + Transport localTransport, Transport remoteTransport, + final NetworkBridgeListener listener) { + DemandForwardingBridge result = null; + if (configuration.isConduitSubscriptions()) { + if (configuration.isDynamicOnly()) { + result = new ConduitBridge(configuration, localTransport, remoteTransport); + } else { + result = new DurableConduitBridge(configuration, localTransport, remoteTransport); + } + } else { + result = new DemandForwardingBridge(configuration, localTransport, remoteTransport); + } + if (listener != null) { + result.setNetworkBridgeListener(listener); + } + return result; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java Wed Aug 8 11:56:59 2007 @@ -16,31 +16,26 @@ */ package org.apache.activemq.network; - - /** - *called when a bridge fails + * called when a bridge fails * * @version $Revision: 1.1 $ */ -public interface NetworkBridgeListener{ - +public interface NetworkBridgeListener { + /** * called when the transport fails - * */ public void bridgeFailed(); /** * called after the bridge is started. - * */ - public void onStart(NetworkBridge bridge); - + public void onStart(NetworkBridge bridge); + /** * called before the bridge is stopped. - * */ - public void onStop(NetworkBridge bridge); + public void onStop(NetworkBridge bridge); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Wed Aug 8 11:56:59 2007 @@ -41,236 +41,231 @@ /** * @version $Revision$ */ -public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service{ +public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service { - protected static final Log log=LogFactory.getLog(NetworkConnector.class); + protected static final Log log = LogFactory.getLog(NetworkConnector.class); protected URI localURI; private Set durableDestinations; - private List excludedDestinations=new CopyOnWriteArrayList(); - private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList(); - private List staticallyIncludedDestinations=new CopyOnWriteArrayList(); + private List excludedDestinations = new CopyOnWriteArrayList(); + private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList(); + private List staticallyIncludedDestinations = new CopyOnWriteArrayList(); protected ConnectionFilter connectionFilter; private BrokerService brokerService; private ObjectName objectName; - - protected ServiceSupport serviceSupport=new ServiceSupport(){ - protected void doStart() throws Exception{ - handleStart(); + protected ServiceSupport serviceSupport = new ServiceSupport() { + + protected void doStart() throws Exception { + handleStart(); } - protected void doStop(ServiceStopper stopper) throws Exception{ + protected void doStop(ServiceStopper stopper) throws Exception { handleStop(stopper); } }; - public NetworkConnector(){ + public NetworkConnector() { } - public NetworkConnector(URI localURI){ - this.localURI=localURI; + public NetworkConnector(URI localURI) { + this.localURI = localURI; } - public URI getLocalUri() throws URISyntaxException{ + public URI getLocalUri() throws URISyntaxException { return localURI; } - public void setLocalUri(URI localURI){ - this.localURI=localURI; + public void setLocalUri(URI localURI) { + this.localURI = localURI; } - /** * @return Returns the durableDestinations. */ - public Set getDurableDestinations(){ + public Set getDurableDestinations() { return durableDestinations; } /** * @param durableDestinations The durableDestinations to set. */ - public void setDurableDestinations(Set durableDestinations){ - this.durableDestinations=durableDestinations; + public void setDurableDestinations(Set durableDestinations) { + this.durableDestinations = durableDestinations; } /** * @return Returns the excludedDestinations. */ - public List getExcludedDestinations(){ + public List getExcludedDestinations() { return excludedDestinations; } /** * @param excludedDestinations The excludedDestinations to set. */ - public void setExcludedDestinations(List excludedDestinations){ - this.excludedDestinations=excludedDestinations; + public void setExcludedDestinations(List excludedDestinations) { + this.excludedDestinations = excludedDestinations; } - public void addExcludedDestination(ActiveMQDestination destiantion){ + public void addExcludedDestination(ActiveMQDestination destiantion) { this.excludedDestinations.add(destiantion); } /** * @return Returns the staticallyIncludedDestinations. */ - public List getStaticallyIncludedDestinations(){ + public List getStaticallyIncludedDestinations() { return staticallyIncludedDestinations; } /** - * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. + * @param staticallyIncludedDestinations The staticallyIncludedDestinations + * to set. */ - public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){ - this.staticallyIncludedDestinations=staticallyIncludedDestinations; + public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) { + this.staticallyIncludedDestinations = staticallyIncludedDestinations; } - public void addStaticallyIncludedDestination(ActiveMQDestination destiantion){ + public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { this.staticallyIncludedDestinations.add(destiantion); } /** * @return Returns the dynamicallyIncludedDestinations. */ - public List getDynamicallyIncludedDestinations(){ + public List getDynamicallyIncludedDestinations() { return dynamicallyIncludedDestinations; } /** - * @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set. + * @param dynamicallyIncludedDestinations The + * dynamicallyIncludedDestinations to set. */ - public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){ - this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations; + public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) { + this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; } - public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion){ + public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { this.dynamicallyIncludedDestinations.add(destiantion); } - - public ConnectionFilter getConnectionFilter(){ + + public ConnectionFilter getConnectionFilter() { return connectionFilter; } - public void setConnectionFilter(ConnectionFilter connectionFilter){ - this.connectionFilter=connectionFilter; + public void setConnectionFilter(ConnectionFilter connectionFilter) { + this.connectionFilter = connectionFilter; } - // Implementation methods // ------------------------------------------------------------------------- - protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){ - List destsList=getDynamicallyIncludedDestinations(); - ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]); + protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) { + List destsList = getDynamicallyIncludedDestinations(); + ActiveMQDestination dests[] = (ActiveMQDestination[])destsList + .toArray(new ActiveMQDestination[destsList.size()]); result.setDynamicallyIncludedDestinations(dests); - destsList=getExcludedDestinations(); - dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]); + destsList = getExcludedDestinations(); + dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setExcludedDestinations(dests); - destsList=getStaticallyIncludedDestinations(); - dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]); + destsList = getStaticallyIncludedDestinations(); + dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setStaticallyIncludedDestinations(dests); - if(durableDestinations!=null){ - ActiveMQDestination[] dest=new ActiveMQDestination[durableDestinations.size()]; - dest=(ActiveMQDestination[])durableDestinations.toArray(dest); + if (durableDestinations != null) { + ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()]; + dest = (ActiveMQDestination[])durableDestinations.toArray(dest); result.setDurableDestinations(dest); } return result; } - protected Transport createLocalTransport() throws Exception{ + protected Transport createLocalTransport() throws Exception { return TransportFactory.connect(localURI); } - public void start() throws Exception{ + public void start() throws Exception { serviceSupport.start(); } - public void stop() throws Exception{ + public void stop() throws Exception { serviceSupport.stop(); } - + public abstract String getName(); - - protected void handleStart() throws Exception{ - if(localURI==null){ + + protected void handleStart() throws Exception { + if (localURI == null) { throw new IllegalStateException("You must configure the 'localURI' property"); } - log.info("Network Connector "+getName()+" Started"); + log.info("Network Connector " + getName() + " Started"); } - protected void handleStop(ServiceStopper stopper) throws Exception{ - log.info("Network Connector "+getName()+" Stopped"); + protected void handleStop(ServiceStopper stopper) throws Exception { + log.info("Network Connector " + getName() + " Stopped"); } - + public ObjectName getObjectName() { - return objectName; - } + return objectName; + } + + public void setObjectName(ObjectName objectName) { + this.objectName = objectName; + } + + public BrokerService getBrokerService() { + return brokerService; + } - public void setObjectName(ObjectName objectName) { - this.objectName = objectName; - } - - public BrokerService getBrokerService() { - return brokerService; - } - - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } - - protected void registerNetworkBridgeMBean(NetworkBridge bridge) { - if (!getBrokerService().isUseJmx()) - return; - - MBeanServer mbeanServer = getBrokerService().getManagementContext() - .getMBeanServer(); - if (mbeanServer != null) { - NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); - try { - ObjectName objectName = createNetworkBridgeObjectName(bridge); - mbeanServer.registerMBean(view, objectName); - } catch (Throwable e) { - log.debug("Network bridge could not be registered in JMX: " - + e.getMessage(), e); - } - } - } - - protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) { - if (!getBrokerService().isUseJmx()) - return; - - MBeanServer mbeanServer = getBrokerService().getManagementContext() - .getMBeanServer(); - if (mbeanServer != null) { - try { - ObjectName objectName = createNetworkBridgeObjectName(bridge); - mbeanServer.unregisterMBean(objectName); - } catch (Throwable e) { - log.debug("Network bridge could not be unregistered in JMX: " - + e.getMessage(), e); - } - } - } - - protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) - throws MalformedObjectNameException { - ObjectName connectorName = getObjectName(); - Hashtable map = connectorName.getKeyPropertyList(); - return new ObjectName(connectorName.getDomain() - + ":" - + "BrokerName=" - + JMXSupport.encodeObjectNamePart((String) map - .get("BrokerName")) - + "," - + "Type=NetworkBridge," - + "NetworkConnectorName=" - + JMXSupport.encodeObjectNamePart((String) map - .get("NetworkConnectorName")) - + "," - + "Name=" - + JMXSupport.encodeObjectNamePart(JMXSupport - .encodeObjectNamePart(bridge.getRemoteAddress()))); - } + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + protected void registerNetworkBridgeMBean(NetworkBridge bridge) { + if (!getBrokerService().isUseJmx()) + return; + + MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + NetworkBridgeViewMBean view = new NetworkBridgeView(bridge); + try { + ObjectName objectName = createNetworkBridgeObjectName(bridge); + mbeanServer.registerMBean(view, objectName); + } catch (Throwable e) { + log.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e); + } + } + } + + protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) { + if (!getBrokerService().isUseJmx()) + return; + + MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + try { + ObjectName objectName = createNetworkBridgeObjectName(bridge); + mbeanServer.unregisterMBean(objectName); + } catch (Throwable e) { + log.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e); + } + } + } + + protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) + throws MalformedObjectNameException { + ObjectName connectorName = getObjectName(); + Hashtable map = connectorName.getKeyPropertyList(); + return new ObjectName(connectorName.getDomain() + + ":" + + "BrokerName=" + + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + + "," + + "Type=NetworkBridge," + + "NetworkConnectorName=" + + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + + "," + + "Name=" + + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge + .getRemoteAddress()))); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java Wed Aug 8 11:56:59 2007 @@ -16,7 +16,6 @@ */ package org.apache.activemq.network.jms; - /** * Create an Inbound Queue Bridge * @@ -24,38 +23,40 @@ * * @version $Revision: 1.1.1.1 $ */ -public class InboundQueueBridge extends QueueBridge{ - +public class InboundQueueBridge extends QueueBridge { + String inboundQueueName; String localQueueName; + /** * Constructor that takes a foriegn destination as an argument + * * @param inboundQueueName */ - public InboundQueueBridge(String inboundQueueName){ - this.inboundQueueName = inboundQueueName; - this.localQueueName = inboundQueueName; + public InboundQueueBridge(String inboundQueueName) { + this.inboundQueueName = inboundQueueName; + this.localQueueName = inboundQueueName; } - + /** * Default Contructor */ - public InboundQueueBridge(){ + public InboundQueueBridge() { } /** * @return Returns the inboundQueueName. */ - public String getInboundQueueName(){ + public String getInboundQueueName() { return inboundQueueName; } /** * @param inboundQueueName The inboundQueueName to set. */ - public void setInboundQueueName(String inboundQueueName){ - this.inboundQueueName=inboundQueueName; - if (this.localQueueName == null){ + public void setInboundQueueName(String inboundQueueName) { + this.inboundQueueName = inboundQueueName; + if (this.localQueueName == null) { this.localQueueName = inboundQueueName; } } @@ -63,15 +64,15 @@ /** * @return the localQueueName */ - public String getLocalQueueName(){ + public String getLocalQueueName() { return localQueueName; } /** * @param localQueueName the localQueueName to set */ - public void setLocalQueueName(String localQueueName){ - this.localQueueName=localQueueName; + public void setLocalQueueName(String localQueueName) { + this.localQueueName = localQueueName; } - + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java?view=diff&rev=563982&r1=563981&r2=563982 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java Wed Aug 8 11:56:59 2007 @@ -16,7 +16,6 @@ */ package org.apache.activemq.network.jms; - /** * Create an Inbound Topic Bridge * @@ -24,38 +23,40 @@ * * @version $Revision: 1.1.1.1 $ */ -public class InboundTopicBridge extends TopicBridge{ - +public class InboundTopicBridge extends TopicBridge { + String inboundTopicName; String localTopicName; + /** * Constructor that takes a foriegn destination as an argument + * * @param inboundTopicName */ - public InboundTopicBridge(String inboundTopicName){ + public InboundTopicBridge(String inboundTopicName) { this.inboundTopicName = inboundTopicName; this.localTopicName = inboundTopicName; } - + /** * Default Contructor */ - public InboundTopicBridge(){ + public InboundTopicBridge() { } /** * @return Returns the outboundTopicName. */ - public String getInboundTopicName(){ + public String getInboundTopicName() { return inboundTopicName; } /** - * @param inboundTopicName + * @param inboundTopicName */ - public void setInboundTopicName(String inboundTopicName){ - this.inboundTopicName=inboundTopicName; - if(this.localTopicName==null){ + public void setInboundTopicName(String inboundTopicName) { + this.inboundTopicName = inboundTopicName; + if (this.localTopicName == null) { this.localTopicName = inboundTopicName; } } @@ -63,15 +64,15 @@ /** * @return the localTopicName */ - public String getLocalTopicName(){ + public String getLocalTopicName() { return localTopicName; } /** * @param localTopicName the localTopicName to set */ - public void setLocalTopicName(String localTopicName){ - this.localTopicName=localTopicName; + public void setLocalTopicName(String localTopicName) { + this.localTopicName = localTopicName; } - + }