activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r835715 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: network/DemandForwardingBridgeSupport.java network/DemandSubscription.java transport/ResponseCorrelator.java
Date Fri, 13 Nov 2009 03:02:50 GMT
Author: cmacnaug
Date: Fri Nov 13 03:02:49 2009
New Revision: 835715

URL: http://svn.apache.org/viewvc?rev=835715&view=rev
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-2439: IllegalState issue.

Tightening synchronization in DemandForwardingBridge to avoid subscription clean up and ack
processing race condition. 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=835715&r1=835714&r2=835715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Nov 13 03:02:49 2009
@@ -89,7 +89,7 @@
  * @version $Revision$
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware
{
-    
+
     private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
     private static final ThreadPoolExecutor ASYNC_TASKS;
     protected final Transport localBroker;
@@ -114,7 +114,7 @@
     protected ActiveMQDestination[] durableDestinations;
     protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId
= new ConcurrentHashMap<ConsumerId, DemandSubscription>();
     protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId
= new ConcurrentHashMap<ConsumerId, DemandSubscription>();
-    protected final BrokerId localBrokerPath[] = new BrokerId[] {null};
+    protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
     protected CountDownLatch startedLatch = new CountDownLatch(2);
     protected CountDownLatch localStartedLatch = new CountDownLatch(1);
     protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
@@ -125,12 +125,11 @@
 
     final AtomicLong enqueueCounter = new AtomicLong();
     final AtomicLong dequeueCounter = new AtomicLong();
-    
+
     private NetworkBridgeListener networkBridgeListener;
     private boolean createdByDuplex;
     private BrokerInfo localBrokerInfo;
     private BrokerInfo remoteBrokerInfo;
-    
 
     private AtomicBoolean started = new AtomicBoolean();
     private TransportConnection duplexInitiatingConnection;
@@ -155,7 +154,7 @@
             localBroker.setTransportListener(new DefaultTransportListener() {
 
                 public void onCommand(Object o) {
-                    Command command = (Command)o;
+                    Command command = (Command) o;
                     serviceLocalCommand(command);
                 }
 
@@ -166,7 +165,7 @@
             remoteBroker.setTransportListener(new TransportListener() {
 
                 public void onCommand(Object o) {
-                    Command command = (Command)o;
+                    Command command = (Command) o;
                     serviceRemoteCommand(command);
                 }
 
@@ -290,10 +289,10 @@
                 localConnectionInfo.setPassword(configuration.getPassword());
                 Transport originalTransport = remoteBroker;
                 while (originalTransport instanceof TransportFilter) {
-                    originalTransport = ((TransportFilter)originalTransport).getNext();
+                    originalTransport = ((TransportFilter) originalTransport).getNext();
                 }
                 if (originalTransport instanceof SslTransport) {
-                    X509Certificate[] peerCerts = ((SslTransport)originalTransport).getPeerCertificates();
+                    X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
                     localConnectionInfo.setTransportContext(peerCerts);
                 }
                 localBroker.oneway(localConnectionInfo);
@@ -385,10 +384,10 @@
                             } finally {
                                 sendShutdown.countDown();
                             }
-                            
+
                         }
                     });
-                    if( !sendShutdown.await(10, TimeUnit.SECONDS) ) {
+                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
                         LOG.info("Network Could not shutdown in a timely manner");
                     }
                 } finally {
@@ -424,12 +423,12 @@
         }
     }
 
-    protected void serviceRemoteCommand(Command command) {    	      	
+    protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
                 if (command.isMessageDispatch()) {
                     waitStarted();
-                    MessageDispatch md = (MessageDispatch)command;
+                    MessageDispatch md = (MessageDispatch) command;
                     serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
                     demandConsumerDispatched++;
                     if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize()
* .75)) {
@@ -438,7 +437,7 @@
                     }
                 } else if (command.isBrokerInfo()) {
                     lastConnectSucceeded.set(true);
-                    remoteBrokerInfo = (BrokerInfo)command;
+                    remoteBrokerInfo = (BrokerInfo) command;
                     Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
                     try {
                         IntrospectionSupport.getProperties(configuration, props, null);
@@ -463,18 +462,18 @@
                     // Let the local broker know the remote broker's ID.
                     localBroker.oneway(command);
                 } else if (command.getClass() == ConnectionError.class) {
-                    ConnectionError ce = (ConnectionError)command;
+                    ConnectionError ce = (ConnectionError) command;
                     serviceRemoteException(ce.getException());
                 } else {
                     if (isDuplex()) {
                         if (command.isMessage()) {
-                            ActiveMQMessage message = (ActiveMQMessage)command;
+                            ActiveMQMessage message = (ActiveMQMessage) command;
                             if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()))
{
                                 serviceRemoteConsumerAdvisory(message.getDataStructure());
                             } else {
-                            	if (!isPermissableDestination(message.getDestination(), true))
{
-                            		return;
-                            	}
+                                if (!isPermissableDestination(message.getDestination(), true))
{
+                                    return;
+                                }
                                 if (message.isResponseRequired()) {
                                     Response reply = new Response();
                                     reply.setCorrelationId(message.getCommandId());
@@ -492,20 +491,20 @@
                                 localBroker.oneway(command);
                                 break;
                             case ConsumerInfo.DATA_STRUCTURE_TYPE:
-                            	localStartedLatch.await();
+                                localStartedLatch.await();
                                 if (started.get()) {
                                     if (!addConsumerInfo((ConsumerInfo) command)) {
                                         if (LOG.isDebugEnabled()) {
-                                            LOG.debug("Ignoring ConsumerInfo: "+ command);
+                                            LOG.debug("Ignoring ConsumerInfo: " + command);
                                         }
                                     } else {
                                         if (LOG.isTraceEnabled()) {
-                                            LOG.trace("Adding ConsumerInfo: "+ command);
+                                            LOG.trace("Adding ConsumerInfo: " + command);
                                         }
                                     }
                                 } else {
                                     // received a subscription whilst stopping
-                                    LOG.warn("Stopping - ignoring ConsumerInfo: "+ command);
+                                    LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
                                 }
                                 break;
                             default:
@@ -538,9 +537,9 @@
         final int networkTTL = configuration.getNetworkTTL();
         if (data.getClass() == ConsumerInfo.class) {
             // Create a new local subscription
-            ConsumerInfo info = (ConsumerInfo)data;
+            ConsumerInfo info = (ConsumerInfo) data;
             BrokerId[] path = info.getBrokerPath();
-            
+
             if (path != null && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " Ignoring sub  from " + remoteBrokerName
+ ", restricted to " + networkTTL + " network hops only : " + info);
@@ -553,7 +552,7 @@
                     LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ ", already routed through this broker once : " + info);
                 }
                 return;
-            }            
+            }
             if (!isPermissableDestination(info.getDestination())) {
                 // ignore if not in the permitted or in the excluded list
                 if (LOG.isDebugEnabled()) {
@@ -561,10 +560,10 @@
                 }
                 return;
             }
-            
+
             // in a cyclic network there can be multiple bridges per broker that can propagate
             // a network subscription so there is a need to synchronise on a shared entity
-            synchronized(brokerService.getVmConnectorURI()) {
+            synchronized (brokerService.getVmConnectorURI()) {
                 if (addConsumerInfo(info)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker
+ " from " + remoteBrokerName + " : " + info);
@@ -578,7 +577,7 @@
         } else if (data.getClass() == DestinationInfo.class) {
             // It's a destination info - we want to pass up
             // information about temporary destinations
-            DestinationInfo destInfo = (DestinationInfo)data;
+            DestinationInfo destInfo = (DestinationInfo) data;
             BrokerId[] path = destInfo.getBrokerPath();
             if (path != null && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
@@ -597,7 +596,7 @@
             destInfo.setConnectionId(localConnectionInfo.getConnectionId());
             if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
                 // re-set connection id so comes from here
-                ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destInfo.getDestination();
+                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
                 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
             }
             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
@@ -606,7 +605,7 @@
             }
             localBroker.oneway(destInfo);
         } else if (data.getClass() == RemoveInfo.class) {
-            ConsumerId id = (ConsumerId)((RemoveInfo)data).getObjectId();
+            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
         }
     }
@@ -640,7 +639,7 @@
                 LOG.debug(configuration.getBrokerName() + " remove local subscription for
remote " + sub.getRemoteInfo().getConsumerId());
             }
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
-            
+
             // continue removal in separate thread to free up this thread for outstanding
responses
             ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
@@ -673,41 +672,46 @@
             try {
                 if (command.isMessageDispatch()) {
                     enqueueCounter.incrementAndGet();
-                    final MessageDispatch md = (MessageDispatch)command;   
+                    final MessageDispatch md = (MessageDispatch) command;
                     final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
-                    if (sub != null && md.getMessage()!=null) {
+                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses())
{
                         // See if this consumer's brokerPath tells us it came from the broker
at the other end
                         // of the bridge. I think we should be making this decision based
on the message's
                         // broker bread crumbs and not the consumer's? However, the message's
broker bread
                         // crumbs are null, which is another matter.   
                         boolean cameFromRemote = false;
-                        Object consumerInfo = md.getMessage().getDataStructure(); 
-                        if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo)
)                  	                   	  
-                    	    cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
                   	                           
-                                            	                     	
+                        Object consumerInfo = md.getMessage().getDataStructure();
+                        if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo))
+                            cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(),
remoteBrokerInfo.getBrokerId());
+
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("bridging " + configuration.getBrokerName() + " ->
" + remoteBrokerName + ": " + message);
                         }
-                        
+
                         if (!message.isResponseRequired()) {
 
                             // If the message was originally sent using async
                             // send, we will preserve that QOS
                             // by bridging it using an async send (small chance
                             // of message loss).
-                            
-                            // Don't send it off to the remote if it originally came from
the remote. 
-                            if( !cameFromRemote ) {
-                               remoteBroker.oneway(message);
-                            } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Message not forwarded on to remote, because
message came from remote");                               
+
+                            try {
+                                // Don't send it off to the remote if it originally came
from the remote. 
+                                if (!cameFromRemote) {
+                                    remoteBroker.oneway(message);
+                                } else {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Message not forwarded on to remote, because
message came from remote");
+                                    }
                                 }
+
+                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
+                                dequeueCounter.incrementAndGet();
+                            } finally {
+                                sub.decrementOutstandingResponses();
                             }
-                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
-                            dequeueCounter.incrementAndGet();
-                            
+
                         } else {
 
                             // The message was not sent using async send, so we
@@ -719,12 +723,12 @@
                                     try {
                                         Response response = future.getResult();
                                         if (response.isException()) {
-                                            ExceptionResponse er = (ExceptionResponse)response;
+                                            ExceptionResponse er = (ExceptionResponse) response;
                                             serviceLocalException(er.getException());
                                         } else {
                                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1));
                                             dequeueCounter.incrementAndGet();
-                                           
+
                                         }
                                     } catch (IOException e) {
                                         serviceLocalException(e);
@@ -735,16 +739,15 @@
                             };
 
                             remoteBroker.asyncRequest(message, callback);
-                            sub.incrementOutstandingResponses();
                         }
-                        
+
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("No subscription registered with this network bridge
for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
                         }
                     }
                 } else if (command.isBrokerInfo()) {
-                    localBrokerInfo = (BrokerInfo)command;
+                    localBrokerInfo = (BrokerInfo) command;
                     serviceLocalBrokerInfo(command);
                 } else if (command.isShutdownInfo()) {
                     LOG.info(configuration.getBrokerName() + " Shutting down");
@@ -757,7 +760,7 @@
                         stop();
                     }
                 } else if (command.getClass() == ConnectionError.class) {
-                    ConnectionError ce = (ConnectionError)command;
+                    ConnectionError ce = (ConnectionError) command;
                     serviceLocalException(ce.getException());
                 } else {
                     switch (command.getDataStructureType()) {
@@ -768,7 +771,7 @@
                     }
                 }
             } catch (Throwable e) {
-                LOG.warn("Caught an exception processing local command",e);
+                LOG.warn("Caught an exception processing local command", e);
                 serviceLocalException(e);
             }
         }
@@ -783,7 +786,7 @@
 
     /**
      * @param dynamicallyIncludedDestinations The
-     *                dynamicallyIncludedDestinations to set.
+     *            dynamicallyIncludedDestinations to set.
      */
     public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations)
{
         this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
@@ -812,7 +815,7 @@
 
     /**
      * @param staticallyIncludedDestinations The staticallyIncludedDestinations
-     *                to set.
+     *            to set.
      */
     public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations)
{
         this.staticallyIncludedDestinations = staticallyIncludedDestinations;
@@ -883,30 +886,30 @@
 
     protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
         if (brokerPath == null || brokerPath.length == 0) {
-            return new BrokerId[] {idToAppend};
+            return new BrokerId[] { idToAppend };
         }
         BrokerId rc[] = new BrokerId[brokerPath.length + 1];
         System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
         rc[brokerPath.length] = idToAppend;
         return rc;
     }
-    
+
     protected boolean isPermissableDestination(ActiveMQDestination destination) {
-    	return isPermissableDestination(destination, false);
+        return isPermissableDestination(destination, false);
     }
 
     protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary)
{
         // Are we not bridging temp destinations?
         if (destination.isTemporary()) {
-        	if (allowTemporary) {
-        		return true;
-        	} else {
-        		return configuration.isBridgeTempDestinations();
-        	}
-        } 
+            if (allowTemporary) {
+                return true;
+            } else {
+                return configuration.isBridgeTempDestinations();
+            }
+        }
 
         final DestinationFilter filter = DestinationFilter.parseFilter(destination);
-        
+
         ActiveMQDestination[] dests = excludedDestinations;
         if (dests != null && dests.length > 0) {
             for (int i = 0; i < dests.length; i++) {
@@ -972,7 +975,7 @@
         addRemoteBrokerToBrokerPath(info);
         DemandSubscription sub = createDemandSubscription(info);
         if (sub != null) {
-            if (duplicateSuppressionIsRequired(sub) ) {
+            if (duplicateSuppressionIsRequired(sub)) {
                 undoMapRegistration(sub);
             } else {
                 addSubscription(sub);
@@ -981,10 +984,10 @@
         }
         return consumerAdded;
     }
-    
+
     private void undoMapRegistration(DemandSubscription sub) {
         subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
-        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());    
+        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
     }
 
     /*
@@ -994,16 +997,16 @@
     private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
         final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
         boolean suppress = false;
-        
+
         if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions())
{
             return suppress;
         }
-        
-        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
       
+
+        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
         Collection<Subscription> currentSubs = 
             getRegionSubscriptions(consumerInfo.getDestination().isTopic());
         for (Subscription sub : currentSubs) {
-            List<ConsumerId> networkConsumers =  sub.getConsumerInfo().getNetworkConsumerIds();
+            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
             if (!networkConsumers.isEmpty()) {
                 if (matchFound(candidateConsumers, networkConsumers)) {
                     suppress = hasLowerPriority(sub, candidate.getLocalInfo());
@@ -1014,10 +1017,9 @@
         return suppress;
     }
 
- 
     private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo)
{
         boolean suppress = false;
-        
+
         if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority())
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription
from " + remoteBrokerName
@@ -1029,7 +1031,7 @@
             // remove the existing lower priority duplicate and allow this candidate
             try {
                 removeDuplicateSubscription(existingSub);
-                
+
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription
" + existingSub.getConsumerInfo()
                             + " with sub from " + remoteBrokerName
@@ -1037,23 +1039,23 @@
                             + candidateInfo.getNetworkConsumerIds());
                 }
             } catch (IOException e) {
-                LOG.error("Failed to remove duplicated sub as a result of sub with higher
priority, sub: "+ existingSub, e);
+                LOG.error("Failed to remove duplicated sub as a result of sub with higher
priority, sub: " + existingSub, e);
             }
         }
         return suppress;
     }
 
     private void removeDuplicateSubscription(Subscription existingSub) throws IOException
{
-        for (NetworkConnector connector: brokerService.getNetworkConnectors()) {
+        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
             if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId()))
{
                 break;
             }
-        }     
+        }
     }
 
     private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId>
networkConsumers) {
         boolean found = false;
-        for (ConsumerId aliasConsumer : networkConsumers) {        
+        for (ConsumerId aliasConsumer : networkConsumers) {
             if (candidateConsumers.contains(aliasConsumer)) {
                 found = true;
                 break;
@@ -1068,21 +1070,20 @@
             (isTopic ? region.getTopicRegion() : region.getQueueRegion());
         return abstractRegion.getSubscriptions().values();
     }
-    
+
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException
{
         //add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());
         return doCreateDemandSubscription(info);
     }
 
-    
     protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException
{
         DemandSubscription result = new DemandSubscription(info);
         result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
         if (info.getDestination().isTemporary()) {
             // reset the local connection Id
 
-            ActiveMQTempDestination dest = (ActiveMQTempDestination)result.getLocalInfo().getDestination();
+            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
             dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
         }
 
@@ -1090,7 +1091,7 @@
             byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
             if (info.getBrokerPath() != null && info.getBrokerPath().length >
1) {
                 // The longer the path to the consumer, the less it's consumer priority.
-                priority -= info.getBrokerPath().length + 1; 
+                priority -= info.getBrokerPath().length + 1;
             }
             result.getLocalInfo().setPriority(priority);
             if (LOG.isDebugEnabled()) {
@@ -1101,19 +1102,19 @@
         return result;
     }
 
-    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination){
+    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination)
{
         ConsumerInfo info = new ConsumerInfo();
         info.setDestination(destination);
         // the remote info held by the DemandSubscription holds the original
         // consumerId,
         // the local info get's overwritten
-       
+
         info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
         DemandSubscription result = null;
         try {
             result = createDemandSubscription(info);
         } catch (IOException e) {
-           LOG.error("Failed to create DemandSubscription ",e);
+            LOG.error("Failed to create DemandSubscription ", e);
         }
         if (result != null) {
             result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
@@ -1132,7 +1133,6 @@
         sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
     }
 
-    
     protected void removeDemandSubscription(ConsumerId id) throws IOException {
         DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
         if (LOG.isDebugEnabled()) {
@@ -1145,17 +1145,17 @@
             }
         }
     }
-    
+
     protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
         boolean removeDone = false;
         DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
         if (sub != null) {
             try {
                 removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
-                removeDone = true;   
+                removeDone = true;
             } catch (IOException e) {
                 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId,
e);
-            }     
+            }
         }
         return removeDone;
     }
@@ -1177,7 +1177,6 @@
     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
 
     protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
-    
 
     protected abstract BrokerId[] getRemoteBrokerPath();
 
@@ -1215,17 +1214,17 @@
     public long getEnqueueCounter() {
         return enqueueCounter.get();
     }
-    
+
     protected boolean isDuplex() {
         return configuration.isDuplex() || createdByDuplex;
     }
-    
+
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
-    
+
     static {
-        ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, "NetworkBridge");
                 thread.setDaemon(true);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=835715&r1=835714&r2=835715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Fri Nov 13 03:02:49 2009
@@ -33,10 +33,11 @@
  */
 public class DemandSubscription {
     private static final Log LOG = LogFactory.getLog(DemandSubscription.class);
-    
+
     private final ConsumerInfo remoteInfo;
     private final ConsumerInfo localInfo;
     private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
+
     private AtomicInteger dispatched = new AtomicInteger(0);
     private AtomicBoolean activeWaiter = new AtomicBoolean();
 
@@ -44,7 +45,7 @@
         remoteInfo = info;
         localInfo = info.copy();
         localInfo.setNetworkSubscription(true);
-        remoteSubsIds.add(info.getConsumerId());    
+        remoteSubsIds.add(info.getConsumerId());
     }
 
     /**
@@ -81,7 +82,6 @@
         return localInfo;
     }
 
-    
     /**
      * @return Returns the remoteInfo.
      */
@@ -111,13 +111,18 @@
 
     public void decrementOutstandingResponses() {
         if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
-            synchronized(activeWaiter) {
+            synchronized (activeWaiter) {
                 activeWaiter.notifyAll();
             }
         }
     }
 
-    public void incrementOutstandingResponses() {
-        dispatched.incrementAndGet(); 
+    public boolean incrementOutstandingResponses() {
+        dispatched.incrementAndGet();
+        if (activeWaiter.get()) {
+            decrementOutstandingResponses();
+            return false;
+        }
+        return true;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=835715&r1=835714&r2=835715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Fri Nov 13 03:02:49 2009
@@ -61,16 +61,23 @@
     }
 
     public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws
IOException {
-        Command command = (Command)o;
+        Command command = (Command) o;
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(true);
         FutureResponse future = new FutureResponse(responseCallback);
+        IOException priorError = null;
         synchronized (requestMap) {
-            if( this.error !=null ) {
-                throw error;
+            priorError = this.error;
+            if (priorError == null) {
+                requestMap.put(new Integer(command.getCommandId()), future);
             }
-            requestMap.put(new Integer(command.getCommandId()), future);
         }
+
+        if (priorError != null) {
+            future.set(new ExceptionResponse(priorError));
+            throw priorError;
+        }
+
         next.oneway(command);
         return future;
     }



Mime
View raw message