activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1458900 - in /activemq/trunk/activemq-mqtt/src: main/java/org/apache/activemq/transport/mqtt/ test/java/org/apache/activemq/transport/mqtt/ test/resources/
Date Wed, 20 Mar 2013 15:56:12 GMT
Author: tabish
Date: Wed Mar 20 15:56:12 2013
New Revision: 1458900

URL: http://svn.apache.org/r1458900
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-4392

Unregister the connection on the Broker side so client's with the same client ID can reconnect.


Modified:
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
    activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
(original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
Wed Mar 20 15:56:12 2013
@@ -50,6 +50,7 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -105,7 +106,7 @@ class MQTTProtocolConverter {
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
-    private ConnectionInfo connectionInfo = new ConnectionInfo();
+    private final ConnectionInfo connectionInfo = new ConnectionInfo();
     private CONNECT connect;
     private String clientId;
     private long defaultKeepAlive;
@@ -159,7 +160,7 @@ class MQTTProtocolConverter {
             }
             case DISCONNECT.TYPE: {
                 LOG.debug("MQTT Client " + getClientId() + " disconnecting");
-                stopTransport();
+                onMQTTDisconnect();
                 break;
             }
             case SUBSCRIBE.TYPE: {
@@ -232,6 +233,7 @@ class MQTTProtocolConverter {
         connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            @Override
             public void onResponse(MQTTProtocolConverter converter, Response response) throws
IOException {
 
                 if (response.isException()) {
@@ -250,6 +252,7 @@ class MQTTProtocolConverter {
 
                 final ProducerInfo producerInfo = new ProducerInfo(producerId);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    @Override
                     public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
 
                         if (response.isException()) {
@@ -272,6 +275,14 @@ class MQTTProtocolConverter {
         });
     }
 
+    void onMQTTDisconnect() throws MQTTProtocolException {
+        if (connected.get()) {
+            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
+            sendToActiveMQ(new ShutdownInfo(), null);
+        }
+        stopTransport();
+    }
+
     void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
         checkConnected();
         Topic[] topics = command.topics();
@@ -516,6 +527,7 @@ class MQTTProtocolConverter {
                         bytesOut.write(data, 0, read);
                     }
                     byteSequence = bytesOut.toByteSequence();
+                    bytesOut.close();
                 }
                 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
             }
@@ -555,7 +567,6 @@ class MQTTProtocolConverter {
             return;
         }
 
-
         long keepAliveMS = keepAliveSeconds * 1000;
 
         if (LOG.isDebugEnabled()) {
@@ -586,9 +597,6 @@ class MQTTProtocolConverter {
         } catch (Exception ex) {
             LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
         }
-
-
-
     }
 
     void handleException(Throwable exception, MQTTFrame command) {
@@ -636,6 +644,7 @@ class MQTTProtocolConverter {
             switch (command.qos()) {
                 case AT_LEAST_ONCE:
                     return new ResponseHandler() {
+                        @Override
                         public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
                             if (response.isException()) {
                                 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse)
response).getException());
@@ -648,6 +657,7 @@ class MQTTProtocolConverter {
                     };
                 case EXACTLY_ONCE:
                     return new ResponseHandler() {
+                        @Override
                         public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
                             if (response.isException()) {
                                 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse)
response).getException());

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
(original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
Wed Mar 20 15:56:12 2013
@@ -21,6 +21,7 @@ import java.security.cert.X509Certificat
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
@@ -58,6 +59,7 @@ public class MQTTTransportFilter extends
         }
     }
 
+    @Override
     public void oneway(Object o) throws IOException {
         try {
             final Command command = (Command) o;
@@ -67,6 +69,7 @@ public class MQTTTransportFilter extends
         }
     }
 
+    @Override
     public void onCommand(Object command) {
         try {
             if (trace) {
@@ -81,6 +84,7 @@ public class MQTTTransportFilter extends
         }
     }
 
+    @Override
     public void sendToActiveMQ(Command command) {
         TransportListener l = transportListener;
         if (l != null) {
@@ -88,6 +92,7 @@ public class MQTTTransportFilter extends
         }
     }
 
+    @Override
     public void sendToMQTT(MQTTFrame command) throws IOException {
         if( !stopped.get() ) {
             if (trace) {
@@ -107,6 +112,7 @@ public class MQTTTransportFilter extends
         }
     }
 
+    @Override
     public X509Certificate[] getPeerCertificates() {
         if (next instanceof SslTransport) {
             X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
@@ -162,10 +168,7 @@ public class MQTTTransportFilter extends
      * The default = 1
      * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ
subscription
      */
-
     public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
         protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
     }
-
-
 }

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Wed Mar 20 15:56:12 2013
@@ -84,6 +84,28 @@ public class MQTTTest extends AbstractMQ
         }));
     }
 
+    @Test(timeout=300000)
+    public void testReuseConnection() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("Test-Client");
+
+        {
+            BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.disconnect();
+            Thread.sleep(1000);
+        }
+        {
+            BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.disconnect();
+            Thread.sleep(1000);
+        }
+    }
+
     @Override
     protected String getProtocolScheme() {
         return "mqtt";

Modified: activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties?rev=1458900&r1=1458899&r2=1458900&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-mqtt/src/test/resources/log4j.properties Wed Mar 20 15:56:12 2013
@@ -20,9 +20,9 @@
 #
 log4j.rootLogger=INFO, out, stdout
 
-#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq.transport.failover=TRACE
+log4j.logger.org.apache.activemq.transport.mqtt=DEBUG
 #log4j.logger.org.apache.activemq.store.jdbc=TRACE
 #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
 #log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG



Mime
View raw message