activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1432487 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-clie...
Date Sat, 12 Jan 2013 18:13:28 GMT
Author: tabish
Date: Sat Jan 12 18:13:27 2013
New Revision: 1432487

URL: http://svn.apache.org/viewvc?rev=1432487&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4248

Add expanded transmit callback interface so that a failure to transmit can be distinguished
from normal operation and allow for no further attempts at dispatch fixing the current NPE
when async dispatch is enabled. 

Added:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java
  (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1432487&r1=1432486&r2=1432487&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
Sat Jan 12 18:13:27 2013
@@ -36,10 +36,44 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.transaction.xa.XAResource;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.IntegerResponse;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.network.DemandForwardingBridge;
 import org.apache.activemq.network.MBeanNetworkListener;
 import org.apache.activemq.network.NetworkBridgeConfiguration;
@@ -57,12 +91,12 @@ import org.apache.activemq.thread.TaskRu
 import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.ResponseCorrelator;
+import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
-import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -174,6 +208,7 @@ public class TransportConnection impleme
      *
      * @return size of dispatch queue
      */
+    @Override
     public int getDispatchQueueSize() {
         synchronized (dispatchQueue) {
             return dispatchQueue.size();
@@ -207,8 +242,7 @@ public class TransportConnection impleme
     }
 
     private boolean expected(IOException e) {
-        return isStomp() &&
-                ((e instanceof SocketException && e.getMessage().indexOf("reset")
!= -1) || e instanceof EOFException);
+        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset")
!= -1) || e instanceof EOFException);
     }
 
     private boolean isStomp() {
@@ -221,6 +255,7 @@ public class TransportConnection impleme
      * service exception closes a socket, we should not tie up broker threads
      * since client sockets may hang or cause deadlocks.
      */
+    @Override
     public void serviceExceptionAsync(final IOException e) {
         if (asyncException.compareAndSet(false, true)) {
             new Thread("Async Exception Handler") {
@@ -237,6 +272,7 @@ public class TransportConnection impleme
      * if: the client is closing or broker is closing. Otherwise, the connection
      * error transmitted to the client before stopping it's transport.
      */
+    @Override
     public void serviceException(Throwable e) {
         // are we a transport exception such as not being able to dispatch
         // synchronously to a transport
@@ -282,6 +318,7 @@ public class TransportConnection impleme
         }
     }
 
+    @Override
     public Response service(Command command) {
         MDC.put("activemq.connector", connector.getUri().toString());
         Response response = null;
@@ -324,30 +361,36 @@ public class TransportConnection impleme
         return response;
     }
 
+    @Override
     public Response processKeepAlive(KeepAliveInfo info) throws Exception {
         return null;
     }
 
+    @Override
     public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception
{
         broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(),
info);
         return null;
     }
 
+    @Override
     public Response processWireFormat(WireFormatInfo info) throws Exception {
         wireFormatInfo = info;
         protocolVersion.set(info.getVersion());
         return null;
     }
 
+    @Override
     public Response processShutdown(ShutdownInfo info) throws Exception {
         stopAsync();
         return null;
     }
 
+    @Override
     public Response processFlush(FlushCommand command) throws Exception {
         return null;
     }
 
+    @Override
     public Response processBeginTransaction(TransactionInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = null;
@@ -365,6 +408,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processEndTransaction(TransactionInfo info) throws Exception {
         // No need to do anything. This packet is just sent by the client
         // make sure he is synced with the server as commit command could
@@ -372,6 +416,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processPrepareTransaction(TransactionInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = null;
@@ -403,6 +448,7 @@ public class TransportConnection impleme
         }
     }
 
+    @Override
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception
{
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = cs.getContext();
@@ -411,6 +457,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception
{
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = cs.getContext();
@@ -419,6 +466,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processRollbackTransaction(TransactionInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = cs.getContext();
@@ -427,6 +475,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processForgetTransaction(TransactionInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = cs.getContext();
@@ -434,6 +483,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processRecoverTransactions(TransactionInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         context = cs.getContext();
@@ -441,6 +491,7 @@ public class TransportConnection impleme
         return new DataArrayResponse(preparedTransactions);
     }
 
+    @Override
     public Response processMessage(Message messageSend) throws Exception {
         ProducerId producerId = messageSend.getProducerId();
         ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
@@ -450,6 +501,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processMessageAck(MessageAck ack) throws Exception {
         ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
         if (consumerExchange != null) {
@@ -458,15 +510,18 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processMessagePull(MessagePull pull) throws Exception {
         return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),
pull);
     }
 
+    @Override
     public Response processMessageDispatchNotification(MessageDispatchNotification notification)
throws Exception {
         broker.processDispatchNotification(notification);
         return null;
     }
 
+    @Override
     public Response processAddDestination(DestinationInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         broker.addDestinationInfo(cs.getContext(), info);
@@ -476,6 +531,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processRemoveDestination(DestinationInfo info) throws Exception {
         TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
         broker.removeDestinationInfo(cs.getContext(), info);
@@ -485,6 +541,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processAddProducer(ProducerInfo info) throws Exception {
         SessionId sessionId = info.getProducerId().getParentId();
         ConnectionId connectionId = sessionId.getParentId();
@@ -517,6 +574,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processRemoveProducer(ProducerId id) throws Exception {
         SessionId sessionId = id.getParentId();
         ConnectionId connectionId = sessionId.getParentId();
@@ -535,6 +593,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processAddConsumer(ConsumerInfo info) throws Exception {
         SessionId sessionId = info.getConsumerId().getParentId();
         ConnectionId connectionId = sessionId.getParentId();
@@ -569,6 +628,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws
Exception {
         SessionId sessionId = id.getParentId();
         ConnectionId connectionId = sessionId.getParentId();
@@ -593,6 +653,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processAddSession(SessionInfo info) throws Exception {
         ConnectionId connectionId = info.getSessionId().getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
@@ -609,6 +670,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws
Exception {
         ConnectionId connectionId = id.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
@@ -642,6 +704,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processAddConnection(ConnectionInfo info) throws Exception {
         // Older clients should have been defaulting this field to true.. but
         // they were not.
@@ -728,6 +791,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
             throws InterruptedException {
         LOG.debug("remove connection id: " + id);
@@ -776,15 +840,18 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processProducerAck(ProducerAck ack) throws Exception {
         // A broker should not get ProducerAck messages.
         return null;
     }
 
+    @Override
     public Connector getConnector() {
         return connector;
     }
 
+    @Override
     public void dispatchSync(Command message) {
         try {
             processDispatch(message);
@@ -793,6 +860,7 @@ public class TransportConnection impleme
         }
     }
 
+    @Override
     public void dispatchAsync(Command message) {
         if (!stopping.get()) {
             if (taskRunner == null) {
@@ -810,17 +878,17 @@ public class TransportConnection impleme
         } else {
             if (message.isMessageDispatch()) {
                 MessageDispatch md = (MessageDispatch) message;
-                Runnable sub = md.getTransmitCallback();
+                TransmitCallback sub = md.getTransmitCallback();
                 broker.postProcessDispatch(md);
                 if (sub != null) {
-                    sub.run();
+                    sub.onFailure();
                 }
             }
         }
     }
 
     protected void processDispatch(Command command) throws IOException {
-        final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch()
? command : null);
+        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch()
? command : null);
         try {
             if (!stopping.get()) {
                 if (messageDispatch != null) {
@@ -828,17 +896,27 @@ public class TransportConnection impleme
                 }
                 dispatch(command);
             }
+        } catch (Throwable e) {
+            if (messageDispatch != null) {
+                TransmitCallback sub = messageDispatch.getTransmitCallback();
+                broker.postProcessDispatch(messageDispatch);
+                if (sub != null) {
+                    sub.onFailure();
+                }
+                messageDispatch = null;
+            }
         } finally {
             if (messageDispatch != null) {
-                Runnable sub = messageDispatch.getTransmitCallback();
+                TransmitCallback sub = messageDispatch.getTransmitCallback();
                 broker.postProcessDispatch(messageDispatch);
                 if (sub != null) {
-                    sub.run();
+                    sub.onSuccess();
                 }
             }
         }
     }
 
+    @Override
     public boolean iterate() {
         try {
             if (pendingStop || stopping.get()) {
@@ -877,6 +955,7 @@ public class TransportConnection impleme
     /**
      * Returns the statistics for this connection
      */
+    @Override
     public ConnectionStatistics getStatistics() {
         return statistics;
     }
@@ -889,10 +968,12 @@ public class TransportConnection impleme
         this.messageAuthorizationPolicy = messageAuthorizationPolicy;
     }
 
+    @Override
     public boolean isManageable() {
         return manageable;
     }
 
+    @Override
     public void start() throws Exception {
         try {
             synchronized (this) {
@@ -931,6 +1012,7 @@ public class TransportConnection impleme
         }
     }
 
+    @Override
     public void stop() throws Exception {
         // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
         // as their lifecycle is handled elsewhere
@@ -949,6 +1031,7 @@ public class TransportConnection impleme
             }
             try {
                 stopTaskRunnerFactory.execute(new Runnable() {
+                    @Override
                     public void run() {
                         try {
                             Thread.sleep(waitTime);
@@ -985,6 +1068,7 @@ public class TransportConnection impleme
             }
             try {
                 stopTaskRunnerFactory.execute(new Runnable() {
+                    @Override
                     public void run() {
                         serviceLock.writeLock().lock();
                         try {
@@ -1039,10 +1123,10 @@ public class TransportConnection impleme
                 Command command = iter.next();
                 if (command.isMessageDispatch()) {
                     MessageDispatch md = (MessageDispatch) command;
-                    Runnable sub = md.getTransmitCallback();
+                    TransmitCallback sub = md.getTransmitCallback();
                     broker.postProcessDispatch(md);
                     if (sub != null) {
-                        sub.run();
+                        sub.onFailure();
                     }
                 }
             }
@@ -1109,6 +1193,7 @@ public class TransportConnection impleme
     /**
      * @return true if the Connection is slow
      */
+    @Override
     public boolean isSlow() {
         return slow;
     }
@@ -1132,6 +1217,7 @@ public class TransportConnection impleme
     /**
      * @return if after being marked, the Connection is still writing
      */
+    @Override
     public boolean isBlocked() {
         return blocked;
     }
@@ -1139,6 +1225,7 @@ public class TransportConnection impleme
     /**
      * @return true if the Connection is connected
      */
+    @Override
     public boolean isConnected() {
         return connected;
     }
@@ -1160,6 +1247,7 @@ public class TransportConnection impleme
     /**
      * @return true if the Connection is active
      */
+    @Override
     public boolean isActive() {
         return active;
     }
@@ -1178,10 +1266,12 @@ public class TransportConnection impleme
         return starting;
     }
 
+    @Override
     public synchronized boolean isNetworkConnection() {
         return networkConnection;
     }
 
+    @Override
     public boolean isFaultTolerantConnection() {
         return this.faultTolerantConnection;
     }
@@ -1201,9 +1291,9 @@ public class TransportConnection impleme
         this.pendingStop = pendingStop;
     }
 
+    @Override
     public Response processBrokerInfo(BrokerInfo info) {
         if (info.isSlaveBroker()) {
-            BrokerService bService = connector.getBrokerService();
             LOG.error(" Slave Brokers are no longer supported - slave trying to attach is:
" + info.getBrokerName());
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
             // so this TransportConnection is the rear end of a network bridge
@@ -1291,10 +1381,12 @@ public class TransportConnection impleme
         }
     }
 
+    @Override
     public String getRemoteAddress() {
         return transport.getRemoteAddress();
     }
 
+    @Override
     public String getConnectionId() {
         List<TransportConnectionState> connectionStates = listConnectionStates();
         for (TransportConnectionState cs : connectionStates) {
@@ -1306,6 +1398,7 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public void updateClient(ConnectionControl control) {
         if (isActive() && isBlocked() == false && isFaultTolerantConnection()
&& this.wireFormatInfo != null
                 && this.wireFormatInfo.getVersion() >= 6) {
@@ -1388,6 +1481,7 @@ public class TransportConnection impleme
         return protocolVersion.get();
     }
 
+    @Override
     public Response processControlCommand(ControlCommand command) throws Exception {
         String control = command.getCommand();
         if (control != null && control.equals("shutdown")) {
@@ -1396,10 +1490,12 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
         return null;
     }
 
+    @Override
     public Response processConnectionControl(ConnectionControl control) throws Exception
{
         if (control != null) {
             faultTolerantConnection = control.isFaultTolerant();
@@ -1407,10 +1503,12 @@ public class TransportConnection impleme
         return null;
     }
 
+    @Override
     public Response processConnectionError(ConnectionError error) throws Exception {
         return null;
     }
 
+    @Override
     public Response processConsumerControl(ConsumerControl control) throws Exception {
         ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
         broker.processConsumerControl(consumerExchange, control);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1432487&r1=1432486&r2=1432487&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Sat Jan 12 18:13:27 2013
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.Broker;
@@ -43,6 +42,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.Response;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +88,7 @@ public abstract class PrefetchSubscripti
     /**
      * Allows a message to be pulled on demand by a client
      */
+    @Override
     public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
{
         // The slave should not deliver pull messages.
         // TODO: when the slave becomes a master, He should send a NULL message to all the
@@ -143,6 +144,7 @@ public abstract class PrefetchSubscripti
         }
     }
 
+    @Override
     public void add(MessageReference node) throws Exception {
         synchronized (pendingLock) {
             // The destination may have just been removed...
@@ -160,6 +162,7 @@ public abstract class PrefetchSubscripti
         dispatchPending();
     }
 
+    @Override
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception {
         synchronized(pendingLock) {
             try {
@@ -189,6 +192,7 @@ public abstract class PrefetchSubscripti
                         + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
     }
 
+    @Override
     public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws
Exception {
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched = false;
@@ -305,7 +309,7 @@ public abstract class PrefetchSubscripti
                     Destination nodeDest = (Destination) node.getRegionDestination();
                     if (node.isExpired()) {
                         if (broker.isExpired(node)) {
-                            Destination regionDestination = (Destination) nodeDest;
+                            Destination regionDestination = nodeDest;
                             regionDestination.messageExpired(context, this, node);
                         }
                         iter.remove();
@@ -500,6 +504,7 @@ public abstract class PrefetchSubscripti
         broker.getRoot().sendToDeadLetterQueue(context, node, this);
     }
 
+    @Override
     public int getInFlightSize() {
         return dispatched.size();
     }
@@ -509,6 +514,7 @@ public abstract class PrefetchSubscripti
      *
      * @return
      */
+    @Override
     public boolean isFull() {
         return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
     }
@@ -516,6 +522,7 @@ public abstract class PrefetchSubscripti
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
+    @Override
     public boolean isLowWaterMark() {
         return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize()
* .4);
     }
@@ -523,6 +530,7 @@ public abstract class PrefetchSubscripti
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
+    @Override
     public boolean isHighWaterMark() {
         return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize()
* .9);
     }
@@ -532,22 +540,27 @@ public abstract class PrefetchSubscripti
         return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
     }
 
+    @Override
     public int getPendingQueueSize() {
         return pending.size();
     }
 
+    @Override
     public int getDispatchedQueueSize() {
         return dispatched.size();
     }
 
+    @Override
     public long getDequeueCounter() {
         return dequeueCounter;
     }
 
+    @Override
     public long getDispatchedCounter() {
         return dispatchCounter;
     }
 
+    @Override
     public long getEnqueueCounter() {
         return enqueueCounter;
     }
@@ -613,8 +626,7 @@ public abstract class PrefetchSubscripti
                     setPendingBatchSize(pending, numberToDispatch);
                     int count = 0;
                     pending.reset();
-                    while (pending.hasNext() && !isFull()
-                            && count < numberToDispatch) {
+                    while (pending.hasNext() && !isFull() && count < numberToDispatch)
{
                         MessageReference node = pending.next();
                         if (node == null) {
                             break;
@@ -683,15 +695,29 @@ public abstract class PrefetchSubscripti
             }
         }
         if (info.isDispatchAsync()) {
-            md.setTransmitCallback(new Runnable() {
+            md.setTransmitCallback(new TransmitCallback() {
 
-                public void run() {
-                    // Since the message gets queued up in async dispatch,
-                    // we don't want to
-                    // decrease the reference count until it gets put on the
-                    // wire.
+                @Override
+                public void onSuccess() {
+                    // Since the message gets queued up in async dispatch, we don't want
to
+                    // decrease the reference count until it gets put on the wire.
                     onDispatch(node, message);
                 }
+
+                @Override
+                public void onFailure() {
+                    Destination nodeDest = (Destination) node.getRegionDestination();
+                    if (nodeDest != null) {
+                        if (node != QueueMessageReference.NULL_MESSAGE) {
+                            nodeDest.getDestinationStatistics().getDispatched().increment();
+                            nodeDest.getDestinationStatistics().getInflight().increment();
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace(info.getConsumerId() + " failed to dispatch: "
+ message.getMessageId() + " - "
+                                        + message.getDestination()  + ", dispatched: " +
dispatchCounter + ", inflight: " + dispatched.size());
+                            }
+                        }
+                    }
+                }
             });
             context.getConnection().dispatchAsync(md);
         } else {
@@ -728,6 +754,7 @@ public abstract class PrefetchSubscripti
      *
      * @param newPrefetch
      */
+    @Override
     public void updateConsumerPrefetch(int newPrefetch) {
         if (context != null && context.getConnection() != null && context.getConnection().isManageable())
{
             ConsumerControl cc = new ConsumerControl();

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1432487&r1=1432486&r2=1432487&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Sat Jan 12 18:13:27 2013
@@ -41,6 +41,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.Response;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +97,7 @@ public class TopicSubscription extends A
         this.active=true;
     }
 
+    @Override
     public void add(MessageReference node) throws Exception {
         if (isDuplicate(node)) {
             return;
@@ -236,6 +238,7 @@ public class TopicSubscription extends A
         }
     }
 
+    @Override
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
         synchronized (matchedListMutex) {
             try {
@@ -256,6 +259,7 @@ public class TopicSubscription extends A
         }
     }
 
+    @Override
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck
ack) throws Exception {
         // Handle the standard acknowledgment case.
         if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
@@ -299,6 +303,7 @@ public class TopicSubscription extends A
         throw new JMSException("Invalid acknowledgment: " + ack);
     }
 
+    @Override
     public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
{
 
         // The slave should not deliver pull messages.
@@ -320,6 +325,7 @@ public class TopicSubscription extends A
                 if (pull.getTimeout() > 0) {
                     scheduler.executeAfterDelay(new Runnable() {
 
+                        @Override
                         public void run() {
                             pullTimeout();
                         }
@@ -346,10 +352,12 @@ public class TopicSubscription extends A
         }
     }
 
+    @Override
     public int getPendingQueueSize() {
         return matched();
     }
 
+    @Override
     public int getDispatchedQueueSize() {
         return (int)(dispatchedCounter.get() - dequeueCounter.get());
     }
@@ -358,14 +366,17 @@ public class TopicSubscription extends A
         return maximumPendingMessages;
     }
 
+    @Override
     public long getDispatchedCounter() {
         return dispatchedCounter.get();
     }
 
+    @Override
     public long getEnqueueCounter() {
         return enqueueCounter.get();
     }
 
+    @Override
     public long getDequeueCounter() {
         return dequeueCounter.get();
     }
@@ -445,10 +456,12 @@ public class TopicSubscription extends A
 
     // Implementation methods
     // -------------------------------------------------------------------------
+    @Override
     public boolean isFull() {
         return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
     }
 
+    @Override
     public int getInFlightSize() {
         return getDispatchedQueueSize();
     }
@@ -456,6 +469,7 @@ public class TopicSubscription extends A
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
+    @Override
     public boolean isLowWaterMark() {
         return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
     }
@@ -463,6 +477,7 @@ public class TopicSubscription extends A
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
+    @Override
     public boolean isHighWaterMark() {
         return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
     }
@@ -507,6 +522,7 @@ public class TopicSubscription extends A
      *
      * @param newPrefetch
      */
+    @Override
     public void updateConsumerPrefetch(int newPrefetch) {
         if (context != null && context.getConnection() != null && context.getConnection().isManageable())
{
             ConsumerControl cc = new ConsumerControl();
@@ -567,9 +583,18 @@ public class TopicSubscription extends A
         }
         if (info.isDispatchAsync()) {
             if (node != null) {
-                md.setTransmitCallback(new Runnable() {
+                md.setTransmitCallback(new TransmitCallback() {
+
                     @Override
-                    public void run() {
+                    public void onSuccess() {
+                        Destination regionDestination = (Destination) node.getRegionDestination();
+                        regionDestination.getDestinationStatistics().getDispatched().increment();
+                        regionDestination.getDestinationStatistics().getInflight().increment();
+                        node.decrementReferenceCount();
+                    }
+
+                    @Override
+                    public void onFailure() {
                         Destination regionDestination = (Destination) node.getRegionDestination();
                         regionDestination.getDestinationStatistics().getDispatched().increment();
                         regionDestination.getDestinationStatistics().getInflight().increment();
@@ -612,6 +637,7 @@ public class TopicSubscription extends A
                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
     }
 
+    @Override
     public void destroy() {
         this.active=false;
         synchronized (matchedListMutex) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1432487&r1=1432486&r2=1432487&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Sat Jan 12 18:13:27 2013
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -36,8 +37,6 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
  */
 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
 
@@ -50,6 +49,7 @@ public class StoreDurableSubscriberCurso
     private PendingMessageCursor currentCursor;
     private final DurableTopicSubscription subscription;
     private boolean immediatePriorityDispatch = true;
+
     /**
      * @param broker Broker for this cursor
      * @param clientId clientId for this cursor
@@ -67,7 +67,7 @@ public class StoreDurableSubscriberCurso
         } else {
             this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
         }
-        
+
         this.nonPersistent.setMaxBatchSize(maxBatchSize);
         this.nonPersistent.setSystemUsage(systemUsage);
         this.storePrefetches.add(this.nonPersistent);
@@ -82,7 +82,7 @@ public class StoreDurableSubscriberCurso
         if (!isStarted()) {
             super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
-            	tsp.setMessageAudit(getMessageAudit());
+                tsp.setMessageAudit(getMessageAudit());
                 tsp.start();
             }
         }
@@ -108,7 +108,7 @@ public class StoreDurableSubscriberCurso
 
     /**
      * Add a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -134,7 +134,7 @@ public class StoreDurableSubscriberCurso
 
     /**
      * remove a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -173,7 +173,7 @@ public class StoreDurableSubscriberCurso
     /**
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
-     * 
+     *
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
      * @return true if recovery required
      */
@@ -290,6 +290,7 @@ public class StoreDurableSubscriberCurso
 
     @Override
     public synchronized void release() {
+        this.currentCursor = null;
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.release();
         }
@@ -326,7 +327,7 @@ public class StoreDurableSubscriberCurso
             tsp.setSystemUsage(usageManager);
         }
     }
-    
+
     @Override
     public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
         super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
@@ -334,7 +335,7 @@ public class StoreDurableSubscriberCurso
             cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
         }
     }
-    
+
     @Override
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         super.setMaxProducersToAudit(maxProducersToAudit);
@@ -350,7 +351,7 @@ public class StoreDurableSubscriberCurso
             cursor.setMaxAuditDepth(maxAuditDepth);
         }
     }
-    
+
     @Override
     public void setEnableAudit(boolean enableAudit) {
         super.setEnableAudit(enableAudit);
@@ -358,7 +359,7 @@ public class StoreDurableSubscriberCurso
             cursor.setEnableAudit(enableAudit);
         }
     }
-    
+
     @Override
     public  void setUseCache(boolean useCache) {
         super.setUseCache(useCache);
@@ -366,7 +367,7 @@ public class StoreDurableSubscriberCurso
             cursor.setUseCache(useCache);
         }
     }
-    
+
     protected synchronized PendingMessageCursor getNextCursor() throws Exception {
         if (currentCursor == null || currentCursor.isEmpty()) {
             currentCursor = null;
@@ -384,7 +385,7 @@ public class StoreDurableSubscriberCurso
         }
         return currentCursor;
     }
-    
+
     @Override
     public String toString() {
         return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java?rev=1432487&r1=1432486&r2=1432487&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageDispatch.java
Sat Jan 12 18:13:27 2013
@@ -17,11 +17,12 @@
 package org.apache.activemq.command;
 
 import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.TransmitCallback;
 
 /**
- * 
+ *
  * @openwire:marshaller code="21"
- * 
+ *
  */
 public class MessageDispatch extends BaseCommand {
 
@@ -34,13 +35,15 @@ public class MessageDispatch extends Bas
 
     protected transient long deliverySequenceId;
     protected transient Object consumer;
-    protected transient Runnable transmitCallback;
+    protected transient TransmitCallback transmitCallback;
     protected transient Throwable rollbackCause;
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public boolean isMessageDispatch() {
         return true;
     }
@@ -105,15 +108,16 @@ public class MessageDispatch extends Bas
         this.consumer = consumer;
     }
 
+    @Override
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processMessageDispatch(this);
     }
 
-    public Runnable getTransmitCallback() {
+    public TransmitCallback getTransmitCallback() {
         return transmitCallback;
     }
 
-    public void setTransmitCallback(Runnable transmitCallback) {
+    public void setTransmitCallback(TransmitCallback transmitCallback) {
         this.transmitCallback = transmitCallback;
     }
 

Added: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java?rev=1432487&view=auto
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java
(added)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java
Sat Jan 12 18:13:27 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+public interface TransmitCallback {
+
+    void onSuccess();
+
+    void onFailure();
+
+}

Propchange: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/TransmitCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message