activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6346
Date Tue, 05 Jul 2016 18:19:42 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 7ddfa97d0 -> fda982dcc


https://issues.apache.org/jira/browse/AMQ-6346

Prevent concurrent access to the MQTT protocol handlers which can lead
to a tansport level deadlock

(cherry picked from commit 96494f74c7142c3396f17696f345c2355c16a61c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1dfd0eeb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1dfd0eeb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1dfd0eeb

Branch: refs/heads/activemq-5.13.x
Commit: 1dfd0eeb60921243bec8245edb16c07ca98c1857
Parents: 7ddfa97
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Jul 5 17:47:49 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Jul 5 18:18:14 2016 +0000

----------------------------------------------------------------------
 .../activemq/transport/ws/AbstractMQTTSocket.java  | 12 +++++++++++-
 .../activemq/transport/ws/jetty9/MQTTSocket.java   | 17 ++++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1dfd0eeb/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
index 65d12c2..c293811 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
 import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -34,6 +35,7 @@ import org.fusesource.mqtt.codec.MQTTFrame;
 
 public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport,
BrokerServiceAware {
 
+    protected ReentrantLock protocolLock = new ReentrantLock();
     protected volatile MQTTProtocolConverter protocolConverter = null;
     protected MQTTWireFormat wireFormat = new MQTTWireFormat();
     protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this,
wireFormat);
@@ -50,16 +52,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements
MQT
 
     @Override
     public void oneway(Object command) throws IOException {
+        protocolLock.lock();
         try {
             getProtocolConverter().onActiveMQCommand((Command)command);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
         }
     }
 
     @Override
     public void sendToActiveMQ(Command command) {
-        doConsume(command);
+        protocolLock.lock();
+        try {
+            doConsume(command);
+        } finally {
+            protocolLock.unlock();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/1dfd0eeb/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
index b2dd9be..f19e4d2 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.transport.ws.AbstractMQTTSocket;
 import org.apache.activemq.util.ByteSequence;
@@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
 
+    private final int ORDERLY_CLOSE_TIMEOUT = 10;
     private Session session;
 
     public MQTTSocket(String remoteAddress) {
@@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener
             }
         }
 
-        receiveCounter += length;
-
+        protocolLock.lock();
         try {
+            receiveCounter += length;
             MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset,
length));
             getProtocolConverter().onMQTTCommand(frame);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
+        } finally {
+            protocolLock.unlock();
         }
     }
 
     @Override
     public void onWebSocketClose(int arg0, String arg1) {
         try {
-            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+            if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS))
{
+                LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1);
+                getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+            }
         } catch (Exception e) {
             LOG.warn("Failed to close WebSocket", e);
+        } finally {
+            if (protocolLock.isHeldByCurrentThread()) {
+                protocolLock.unlock();
+            }
         }
     }
 


Mime
View raw message