activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1182890 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/transport/ activemq-core/src/main/java/org/apache/activemq/transport/stomp/ activemq-core/src/test/java/org/apache/activemq/transport/stomp/ activemq-optional/sr...
Date Thu, 13 Oct 2011 14:54:16 GMT
Author: tabish
Date: Thu Oct 13 14:54:15 2011
New Revision: 1182890

URL: http://svn.apache.org/viewvc?rev=1182890&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3481

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
Thu Oct 13 14:54:15 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -56,6 +57,8 @@ public abstract class AbstractInactivity
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
     private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
 
+    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
+
     private SchedulerTimerTask writeCheckerTask;
     private SchedulerTimerTask readCheckerTask;
 
@@ -140,11 +143,17 @@ public abstract class AbstractInactivity
                 public void run() {
                     if (monitorStarted.get()) {
                         try {
-                            KeepAliveInfo info = new KeepAliveInfo();
-                            info.setResponseRequired(keepAliveResponseRequired);
-                            oneway(info);
+                            // If we can't get the lock it means another write beat us into
the
+                            // send and we don't need to heart beat now.
+                            if (sendLock.writeLock().tryLock()) {
+                                KeepAliveInfo info = new KeepAliveInfo();
+                                info.setResponseRequired(keepAliveResponseRequired);
+                                doOnewaySend(info);
+                            }
                         } catch (IOException e) {
                             onException(e);
+                        } finally {
+                            sendLock.writeLock().unlock();
                         }
                     }
                 };
@@ -175,7 +184,6 @@ public abstract class AbstractInactivity
                 public void run() {
                     onException(new InactivityIOException("Channel was inactive for too (>"
+ readCheckTime + ") long: "+next.getRemoteAddress()));
                 };
-
             });
         } else {
             if (LOG.isTraceEnabled()) {
@@ -195,11 +203,14 @@ public abstract class AbstractInactivity
             if (command.getClass() == KeepAliveInfo.class) {
                 KeepAliveInfo info = (KeepAliveInfo) command;
                 if (info.isResponseRequired()) {
+                    sendLock.readLock().lock();
                     try {
                         info.setResponseRequired(false);
                         oneway(info);
                     } catch (IOException e) {
                         onException(e);
+                    } finally {
+                        sendLock.readLock().unlock();
                     }
                 }
             } else {
@@ -212,39 +223,41 @@ public abstract class AbstractInactivity
                         }
                     }
                 }
-                synchronized (readChecker) {
-                    transportListener.onCommand(command);
-                }
+
+                transportListener.onCommand(command);
             }
         } finally {
-
             inReceive.set(false);
         }
     }
 
     public void oneway(Object o) throws IOException {
-        // Disable inactivity monitoring while processing a command.
-        // synchronize this method - its not synchronized
-        // further down the transport stack and gets called by more
-        // than one thread  by this class
-        synchronized(inSend) {
-            inSend.set(true);
-            try {
+        // To prevent the inactivity monitor from sending a message while we
+        // are performing a send we take a read lock.  The inactivity monitor
+        // sends its Heart-beat commands under a write lock.  This means that
+        // the MutexTransport is still responsible for synchronizing sends
+        this.sendLock.readLock().lock();
+        inSend.set(true);
+        try {
+            doOnewaySend(o);
+        } finally {
+            commandSent.set(true);
+            inSend.set(false);
+            this.sendLock.readLock().unlock();
+        }
+    }
 
-                if( failed.get() ) {
-                    throw new InactivityIOException("Cannot send, channel has already failed:
"+next.getRemoteAddress());
-                }
-                if (o.getClass() == WireFormatInfo.class) {
-                    synchronized (this) {
-                        processOutboundWireFormatInfo((WireFormatInfo) o);
-                    }
-                }
-                next.oneway(o);
-            } finally {
-                commandSent.set(true);
-                inSend.set(false);
+    // Must be called under lock, either read or write on sendLock.
+    private void doOnewaySend(Object command) throws IOException {
+        if( failed.get() ) {
+            throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
+        }
+        if (command.getClass() == WireFormatInfo.class) {
+            synchronized (this) {
+                processOutboundWireFormatInfo((WireFormatInfo) command);
             }
         }
+        next.oneway(command);
     }
 
     public void onException(IOException error) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
Thu Oct 13 14:54:15 2011
@@ -17,44 +17,90 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * 
+ * Thread safe Transport Filter that serializes calls to and from the Transport Stack.
  */
 public class MutexTransport extends TransportFilter {
 
-    private final Object writeMutex = new Object();
+    private final ReentrantLock wreiteLock = new ReentrantLock();
+    private boolean syncOnCommand;
 
     public MutexTransport(Transport next) {
         super(next);
+        this.syncOnCommand = false;
     }
 
+    public MutexTransport(Transport next, boolean syncOnCommand) {
+        super(next);
+        this.syncOnCommand = syncOnCommand;
+    }
+
+    @Override
+    public void onCommand(Object command) {
+        if (syncOnCommand) {
+            wreiteLock.lock();
+            try {
+                transportListener.onCommand(command);
+            } finally {
+                wreiteLock.unlock();
+            }
+        } else {
+            transportListener.onCommand(command);
+        }
+    }
+
+    @Override
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback)
throws IOException {
-        synchronized (writeMutex) {
+        wreiteLock.lock();
+        try {
             return next.asyncRequest(command, null);
+        } finally {
+            wreiteLock.unlock();
         }
     }
 
+    @Override
     public void oneway(Object command) throws IOException {
-        synchronized (writeMutex) {
+        wreiteLock.lock();
+        try {
             next.oneway(command);
+        } finally {
+            wreiteLock.unlock();
         }
     }
 
+    @Override
     public Object request(Object command) throws IOException {
-        synchronized (writeMutex) {
+        wreiteLock.lock();
+        try {
             return next.request(command);
+        } finally {
+            wreiteLock.unlock();
         }
     }
 
+    @Override
     public Object request(Object command, int timeout) throws IOException {
-        synchronized (writeMutex) {
+        wreiteLock.lock();
+        try {
             return next.request(command, timeout);
+        } finally {
+            wreiteLock.unlock();
         }
     }
 
+    @Override
     public String toString() {
         return next.toString();
     }
 
+    public boolean isSyncOnCommand() {
+        return syncOnCommand;
+    }
+
+    public void setSyncOnCommand(boolean syncOnCommand) {
+        this.syncOnCommand = syncOnCommand;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Oct 13 14:54:15 2011
@@ -166,16 +166,7 @@ public class ProtocolConverter {
             command.setResponseRequired(true);
             resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
         }
-        stompTransport.asyncSendToActiveMQ(command);
-    }
-
-    protected void asyncSendToActiveMQ(Command command, ResponseHandler handler) {
-        command.setCommandId(generateCommandId());
-        if (handler != null) {
-            command.setResponseRequired(true);
-            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
-        }
-        stompTransport.asyncSendToActiveMQ(command);
+        stompTransport.sendToActiveMQ(command);
     }
 
     protected void sendToStomp(StompFrame command) throws IOException {
@@ -301,7 +292,7 @@ public class ProtocolConverter {
         }
 
         message.onSend();
-        asyncSendToActiveMQ(message, createResponseHandler(command));
+        sendToActiveMQ(message, createResponseHandler(command));
     }
 
     protected void onStompNack(StompFrame command) throws ProtocolException {
@@ -338,7 +329,7 @@ public class ProtocolConverter {
             if (sub != null) {
                 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
                 if (ack != null) {
-                    asyncSendToActiveMQ(ack, createResponseHandler(command));
+                    sendToActiveMQ(ack, createResponseHandler(command));
                 } else {
                     throw new ProtocolException("Unexpected NACK received for message-id
[" + messageId + "]");
                 }
@@ -377,7 +368,7 @@ public class ProtocolConverter {
             if (sub != null) {
                 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
                 if (ack != null) {
-                    asyncSendToActiveMQ(ack, createResponseHandler(command));
+                    sendToActiveMQ(ack, createResponseHandler(command));
                     acked = true;
                 }
             }
@@ -391,7 +382,7 @@ public class ProtocolConverter {
             for (StompSubscription sub : subscriptionsByConsumerId.values()) {
                 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
                 if (ack != null) {
-                    asyncSendToActiveMQ(ack, createResponseHandler(command));
+                    sendToActiveMQ(ack, createResponseHandler(command));
                     acked = true;
                     break;
                 }
@@ -426,7 +417,7 @@ public class ProtocolConverter {
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.BEGIN);
 
-        asyncSendToActiveMQ(tx, createResponseHandler(command));
+        sendToActiveMQ(tx, createResponseHandler(command));
     }
 
     protected void onStompCommit(StompFrame command) throws ProtocolException {
@@ -453,7 +444,7 @@ public class ProtocolConverter {
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
 
-        asyncSendToActiveMQ(tx, createResponseHandler(command));
+        sendToActiveMQ(tx, createResponseHandler(command));
     }
 
     protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -482,7 +473,7 @@ public class ProtocolConverter {
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.ROLLBACK);
 
-        asyncSendToActiveMQ(tx, createResponseHandler(command));
+        sendToActiveMQ(tx, createResponseHandler(command));
     }
 
     protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@@ -550,7 +541,7 @@ public class ProtocolConverter {
 
         // dispatch can beat the receipt so send it early
         sendReceipt(command);
-        asyncSendToActiveMQ(consumerInfo, null);
+        sendToActiveMQ(consumerInfo, null);
     }
 
     protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@@ -579,7 +570,7 @@ public class ProtocolConverter {
             info.setClientId(durable);
             info.setSubscriptionName(durable);
             info.setConnectionId(connectionId);
-            asyncSendToActiveMQ(info, createResponseHandler(command));
+            sendToActiveMQ(info, createResponseHandler(command));
             return;
         }
 
@@ -587,7 +578,7 @@ public class ProtocolConverter {
 
             StompSubscription sub = this.subscriptions.remove(subscriptionId);
             if (sub != null) {
-                asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
                 return;
             }
 
@@ -598,7 +589,7 @@ public class ProtocolConverter {
             for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator();
iter.hasNext();) {
                 StompSubscription sub = iter.next();
                 if (destination != null && destination.equals(sub.getDestination()))
{
-                    asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
                     iter.remove();
                     return;
                 }
@@ -721,8 +712,8 @@ public class ProtocolConverter {
 
     protected void onStompDisconnect(StompFrame command) throws ProtocolException {
         checkConnected();
-        asyncSendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
-        asyncSendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+        sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
+        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
         connected.set(false);
     }
 
@@ -787,7 +778,7 @@ public class ProtocolConverter {
         ActiveMQDestination rc = tempDestinations.get(name);
         if( rc == null ) {
             rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
-            asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
+            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
             tempDestinations.put(name, rc);
         }
         return rc;
@@ -797,7 +788,7 @@ public class ProtocolConverter {
         ActiveMQDestination rc = tempDestinations.get(name);
         if( rc == null ) {
             rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
-            asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
+            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
             tempDestinations.put(name, rc);
             tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
Thu Oct 13 14:54:15 2011
@@ -21,6 +21,7 @@ import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Map;
 
 import javax.net.ServerSocketFactory;
@@ -29,6 +30,7 @@ import javax.net.SocketFactory;
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.nio.NIOTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransport;
@@ -62,6 +64,19 @@ public class StompNIOTransportFactory ex
     }
 
     @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options)
throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        MutexTransport mutex = transport.narrow(MutexTransport.class);
+        if (mutex != null) {
+            mutex.setSyncOnCommand(true);
+        }
+
+        return transport;
+    }
+
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
         transport = new StompTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
Thu Oct 13 14:54:15 2011
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.SslTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -46,6 +48,19 @@ public class StompSslTransportFactory ex
         return super.compositeConfigure(transport, format, options);
     }
 
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options)
throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        MutexTransport mutex = transport.narrow(MutexTransport.class);
+        if (mutex != null) {
+            mutex.setSyncOnCommand(true);
+        }
+
+        return transport;
+    }
+
     public void setBrokerService(BrokerService brokerService) {
         this.brokerContext = brokerService.getBrokerContext();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Thu Oct 13 14:54:15 2011
@@ -75,7 +75,7 @@ public class StompSubscription {
             }
         } else if (ackMode == AUTO_ACK) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
-            protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
+            protocolConverter.getStompTransport().sendToActiveMQ(ack);
         }
 
         boolean ignoreTransformation = false;
@@ -115,7 +115,7 @@ public class StompSubscription {
 
         if (!unconsumedMessage.isEmpty()) {
             MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE,
unconsumedMessage.size());
-            protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
+            protocolConverter.getStompTransport().sendToActiveMQ(ack);
             unconsumedMessage.clear();
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
Thu Oct 13 14:54:15 2011
@@ -29,8 +29,6 @@ public interface StompTransport {
 
     public void sendToActiveMQ(Command command);
 
-    public void asyncSendToActiveMQ(Command command);
-
     public void sendToStomp(StompFrame command) throws IOException;
 
     public X509Certificate[] getPeerCertificates();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
Thu Oct 13 14:54:15 2011
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -28,8 +30,6 @@ import org.apache.activemq.wireformat.Wi
 
 /**
  * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
- *
- *
  */
 public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware
{
 
@@ -50,6 +50,19 @@ public class StompTransportFactory exten
         this.brokerContext = brokerService.getBrokerContext();
     }
 
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options)
throws Exception {
+        transport = super.serverConfigure(transport, format, options);
+
+        MutexTransport mutex = transport.narrow(MutexTransport.class);
+        if (mutex != null) {
+            mutex.setSyncOnCommand(true);
+        }
+
+        return transport;
+    }
+
     @Override
     protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
         StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
Thu Oct 13 14:54:15 2011
@@ -18,15 +18,11 @@ package org.apache.activemq.transport.st
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.thread.DefaultThreadPools;
-import org.apache.activemq.thread.Task;
-import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
@@ -50,58 +46,18 @@ public class StompTransportFilter extend
     private final ProtocolConverter protocolConverter;
     private StompInactivityMonitor monitor;
     private StompWireFormat wireFormat;
-    private final TaskRunner asyncSendTask;
-    private final ConcurrentLinkedQueue<Command> asyncCommands = new ConcurrentLinkedQueue<Command>();
 
     private boolean trace;
-    private int maxAsyncBatchSize = 25;
 
     public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext)
{
         super(next);
         this.protocolConverter = new ProtocolConverter(this, brokerContext);
 
-        asyncSendTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new
Task() {
-            public boolean iterate() {
-                int iterations = 0;
-                TransportListener listener = transportListener;
-                if (listener != null) {
-                    while (iterations++ < maxAsyncBatchSize && !asyncCommands.isEmpty())
{
-                        Command command = asyncCommands.poll();
-                        if (command != null) {
-                            listener.onCommand(command);
-                        }
-                    }
-                }
-                return !asyncCommands.isEmpty();
-            }
-
-        }, "ActiveMQ StompTransport Async Worker: " + System.identityHashCode(this));
-
         if (wireFormat instanceof StompWireFormat) {
             this.wireFormat = (StompWireFormat) wireFormat;
         }
     }
 
-    public void stop() throws Exception {
-        asyncSendTask.shutdown();
-
-        TransportListener listener = transportListener;
-        if (listener != null) {
-            Command commands[] = new Command[0];
-            asyncCommands.toArray(commands);
-            asyncCommands.clear();
-            for(Command command : commands) {
-                try {
-                    listener.onCommand(command);
-                } catch(Exception e) {
-                    break;
-                }
-            }
-        }
-
-        super.stop();
-    }
-
     public void oneway(Object o) throws IOException {
         try {
             final Command command = (Command) o;
@@ -132,15 +88,6 @@ public class StompTransportFilter extend
         }
     }
 
-    public void asyncSendToActiveMQ(Command command) {
-        asyncCommands.offer(command);
-        try {
-            asyncSendTask.wakeup();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
     public void sendToStomp(StompFrame command) throws IOException {
         if (trace) {
             TRACE.trace("Sending: \n" + command);
@@ -183,12 +130,4 @@ public class StompTransportFilter extend
     public StompWireFormat getWireFormat() {
         return this.wireFormat;
     }
-
-    public int getMaxAsyncBatchSize() {
-        return maxAsyncBatchSize;
-    }
-
-    public void setMaxAsyncBatchSize(int maxAsyncBatchSize) {
-        this.maxAsyncBatchSize = maxAsyncBatchSize;
-    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Thu Oct 13 14:54:15 2011
@@ -22,8 +22,10 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -1717,6 +1719,67 @@ public class StompTest extends Combinati
         assertEquals("Number of clients", expected, actual);
     }
 
+    public void testDisconnectDoesNotDeadlockBroker() throws Exception {
+        for (int i = 0; i < 20; ++i) {
+            doTestConnectionLeak();
+        }
+    }
+
+    private void doTestConnectionLeak() throws Exception {
+        stompConnect();
+
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        boolean gotMessage = false;
+        boolean gotReceipt = false;
+
+        char[] payload = new char[1024];
+        Arrays.fill(payload, 'A');
+
+        String test = "SEND\n" +
+                "x-type:DEV-3485\n"  +
+                "x-uuid:" + UUID.randomUUID() + "\n"  +
+                "persistent:true\n"  +
+                "receipt:" + UUID.randomUUID() + "\n" +
+                "destination:/queue/test.DEV-3485" +
+                "\n\n" +
+                new String(payload) + Stomp.NULL;
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        stompConnection.sendFrame(test);
+
+        // We only want one of them, to trigger the shutdown and potentially
+        // see a deadlock.
+        while (!gotMessage && !gotReceipt) {
+            frame = stompConnection.receiveFrame();
+
+            LOG.debug("Received the frame: " + frame);
+
+            if (frame.startsWith("RECEIPT")) {
+                gotReceipt = true;
+            } else if(frame.startsWith("MESSAGE")) {
+                gotMessage = true;
+            } else {
+                fail("Received a frame that we were not expecting.");
+            }
+        }
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        stompConnection.close();
+    }
+
     protected void waitForFrameToTakeEffect() throws InterruptedException {
         // bit of a dirty hack :)
         // another option would be to force some kind of receipt to be returned

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1182890&r1=1182889&r2=1182890&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Thu Oct 13 14:54:15 2011
@@ -101,9 +101,4 @@ class StompSocket extends TransportSuppo
     public StompWireFormat getWireFormat() {
         return this.wireFormat;
     }
-
-    @Override
-    public void asyncSendToActiveMQ(Command command) {
-        doConsume(command);
-    }
 }



Mime
View raw message