activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5468
Date Mon, 05 Jan 2015 23:53:58 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 6c2e2f544 -> 4b7131ff8


https://issues.apache.org/jira/browse/AMQ-5468

Add a connect check in the inactivity monitor to account for opened
connections that might drop but not be spotted, in the case where the
connect frame is lost this can lead to connections that aren't fully
opened and won't be cleaned up until the broker detects the socket has
failed.  

By default the connection timer is set to 30 seconds, if no connect
frame is read by then the connection is dropped.  The broker can be
configured via the 'transport.connectAttemptTimeout' URI option, a value
<= zero disable the check.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4b7131ff
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4b7131ff
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4b7131ff

Branch: refs/heads/trunk
Commit: 4b7131ff852e9c8bbf20c1c41ae0b6640b7d001a
Parents: 6c2e2f5
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jan 5 18:53:34 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jan 5 18:53:34 2015 -0500

----------------------------------------------------------------------
 .../transport/mqtt/MQTTInactivityMonitor.java   | 109 ++++++++++------
 .../transport/mqtt/MQTTProtocolConverter.java   |   5 +-
 .../transport/mqtt/MQTTTransportFilter.java     |  29 ++++-
 .../activemq/transport/mqtt/MQTTWireFormat.java |   7 ++
 .../transport/mqtt/MQTTConnectTest.java         | 124 +++++++++++++++++++
 .../transport/mqtt/MQTTTestSupport.java         |   9 ++
 6 files changed, 243 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4b7131ff/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
index 22bbac5..28b6926 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
@@ -47,7 +47,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
     private static int CHECKER_COUNTER;
     private static Timer READ_CHECK_TIMER;
 
-    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
     private final AtomicBoolean failed = new AtomicBoolean(false);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
     private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
@@ -57,9 +56,34 @@ public class MQTTInactivityMonitor extends TransportFilter {
 
     private long readGraceTime = DEFAULT_CHECK_TIME_MILLS;
     private long readKeepAliveTime = DEFAULT_CHECK_TIME_MILLS;
-    private boolean keepAliveResponseRequired;
     private MQTTProtocolConverter protocolConverter;
 
+    private long connectionTimeout = MQTTWireFormat.DEFAULT_CONNECTION_TIMEOUT;
+    private SchedulerTimerTask connectCheckerTask;
+    private final Runnable connectChecker = new Runnable() {
+
+        private final long startTime = System.currentTimeMillis();
+
+        @Override
+        public void run() {
+
+            long now = System.currentTimeMillis();
+
+            if ((now - startTime) >= connectionTimeout && connectCheckerTask !=
null && !ASYNC_TASKS.isTerminating()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No CONNECT frame received in time for " + MQTTInactivityMonitor.this.toString()
+ "! Throwing InactivityIOException.");
+                }
+                ASYNC_TASKS.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        onException(new InactivityIOException("Channel was inactive for too
(>" + (readKeepAliveTime + readGraceTime) + ") long: "
+                            + next.getRemoteAddress()));
+                    }
+                });
+            }
+        }
+    };
+
     private final Runnable readChecker = new Runnable() {
         long lastReceiveTime = System.currentTimeMillis();
 
@@ -85,15 +109,15 @@ public class MQTTInactivityMonitor extends TransportFilter {
                 return;
             }
 
-            if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime &&
monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+            if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime &&
readCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString()
+ "! Throwing InactivityIOException.");
                 }
                 ASYNC_TASKS.execute(new Runnable() {
                     @Override
                     public void run() {
-                        onException(new InactivityIOException("Channel was inactive for too
(>" + (readKeepAliveTime + readGraceTime) + ") long: "
-                            + next.getRemoteAddress()));
+                        onException(new InactivityIOException("Channel was inactive for too
(>" +
+                                    (connectionTimeout) + ") long: " + next.getRemoteAddress()));
                     }
                 });
             }
@@ -107,12 +131,12 @@ public class MQTTInactivityMonitor extends TransportFilter {
     @Override
     public void start() throws Exception {
         next.start();
-        startMonitorThread();
     }
 
     @Override
     public void stop() throws Exception {
-        stopMonitorThread();
+        stopReadChecker();
+        stopConnectChecker();
         next.stop();
     }
 
@@ -149,7 +173,8 @@ public class MQTTInactivityMonitor extends TransportFilter {
     @Override
     public void onException(IOException error) {
         if (failed.compareAndSet(false, true)) {
-            stopMonitorThread();
+            stopConnectChecker();
+            stopReadChecker();
             if (protocolConverter != null) {
                 protocolConverter.onTransportError();
             }
@@ -173,18 +198,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
         this.readKeepAliveTime = readKeepAliveTime;
     }
 
-    public boolean isKeepAliveResponseRequired() {
-        return this.keepAliveResponseRequired;
-    }
-
-    public void setKeepAliveResponseRequired(boolean value) {
-        this.keepAliveResponseRequired = value;
-    }
-
-    public boolean isMonitorStarted() {
-        return this.monitorStarted.get();
-    }
-
     public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }
@@ -193,41 +206,61 @@ public class MQTTInactivityMonitor extends TransportFilter {
         return protocolConverter;
     }
 
-    synchronized void startMonitorThread() {
+    synchronized void startConnectChecker(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+        if (connectionTimeout > 0 && connectCheckerTask == null) {
+            connectCheckerTask = new SchedulerTimerTask(connectChecker);
 
-        // Not yet configured if this isn't set yet.
-        if (protocolConverter == null) {
-            return;
-        }
+            long connectionCheckInterval = Math.min(connectionTimeout, 1000);
 
-        if (monitorStarted.get()) {
-            return;
+            synchronized (AbstractInactivityMonitor.class) {
+                if (CHECKER_COUNTER == 0) {
+                    ASYNC_TASKS = createExecutor();
+                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
+                }
+                CHECKER_COUNTER++;
+                READ_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
+            }
         }
+    }
 
-        if (readKeepAliveTime > 0) {
+    synchronized void startReadChecker() {
+        if (readKeepAliveTime > 0 && readCheckerTask == null) {
             readCheckerTask = new SchedulerTimerTask(readChecker);
-        }
 
-        if (readKeepAliveTime > 0) {
-            monitorStarted.set(true);
             synchronized (AbstractInactivityMonitor.class) {
                 if (CHECKER_COUNTER == 0) {
                     ASYNC_TASKS = createExecutor();
                     READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
                 }
                 CHECKER_COUNTER++;
-                if (readKeepAliveTime > 0) {
-                    READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime);
-                }
+                READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime);
             }
         }
     }
 
-    synchronized void stopMonitorThread() {
-        if (monitorStarted.compareAndSet(true, false)) {
-            if (readCheckerTask != null) {
-                readCheckerTask.cancel();
+    synchronized void stopConnectChecker() {
+        if (connectCheckerTask != null) {
+            connectCheckerTask.cancel();
+            connectCheckerTask = null;
+
+            synchronized (AbstractInactivityMonitor.class) {
+                READ_CHECK_TIMER.purge();
+                CHECKER_COUNTER--;
+                if (CHECKER_COUNTER == 0) {
+                    READ_CHECK_TIMER.cancel();
+                    READ_CHECK_TIMER = null;
+                    ThreadPoolUtils.shutdown(ASYNC_TASKS);
+                    ASYNC_TASKS = null;
+                }
             }
+        }
+    }
+
+    synchronized void stopReadChecker() {
+        if (readCheckerTask != null) {
+            readCheckerTask.cancel();
+            readCheckerTask = null;
 
             synchronized (AbstractInactivityMonitor.class) {
                 READ_CHECK_TIMER.purge();

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b7131ff/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index c05c729..e821dbc 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -625,6 +625,9 @@ public class MQTTProtocolConverter {
             return;
         }
 
+        // Client has sent a valid CONNECT frame, we can stop the connect checker.
+        monitor.stopConnectChecker();
+
         long keepAliveMS = keepAliveSeconds * 1000;
 
         LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
@@ -642,7 +645,7 @@ public class MQTTProtocolConverter {
             monitor.setProtocolConverter(this);
             monitor.setReadKeepAliveTime(keepAliveMS);
             monitor.setReadGraceTime(readGracePeriod);
-            monitor.startMonitorThread();
+            monitor.startReadChecker();
 
             LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace
period)",
                       new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod
});

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b7131ff/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 7c1566f..1cb6580 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -60,6 +60,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
     private MQTTInactivityMonitor monitor;
     private MQTTWireFormat wireFormat;
     private final AtomicBoolean stopped = new AtomicBoolean();
+    private long connectAttemptTimeout = MQTTWireFormat.DEFAULT_CONNECTION_TIMEOUT;
 
     private boolean trace;
     private final Object sendLock = new Object();
@@ -149,8 +150,16 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
     }
 
     @Override
+    public void start() throws Exception {
+        if (monitor != null) {
+            monitor.startConnectChecker(getConnectAttemptTimeout());
+        }
+        super.start();
+    }
+
+    @Override
     public void stop() throws Exception {
-        if( stopped.compareAndSet(false, true) ) {
+        if (stopped.compareAndSet(false, true)) {
             super.stop();
         }
     }
@@ -203,6 +212,24 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
         protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
     }
 
+    /**
+     * @return the timeout value used to fail a connection if no CONNECT frame read.
+     */
+    public long getConnectAttemptTimeout() {
+        return connectAttemptTimeout;
+    }
+
+    /**
+     * Sets the timeout value used to fail a connection if no CONNECT frame is read
+     * in the given interval.
+     *
+     * @param connectTimeout
+     *        the connection frame received timeout value.
+     */
+    public void setConnectAttemptTimeout(long connectTimeout) {
+        this.connectAttemptTimeout = connectTimeout;
+    }
+
     public boolean getPublishDollarTopics() {
         return protocolConverter != null && protocolConverter.getPublishDollarTopics();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b7131ff/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
index cc35020..70eaec8 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
@@ -36,9 +36,11 @@ import org.fusesource.mqtt.codec.MQTTFrame;
 public class MQTTWireFormat implements WireFormat {
 
     static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
+    static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
 
     private int version = 1;
 
+    @Override
     public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
@@ -47,12 +49,14 @@ public class MQTTWireFormat implements WireFormat {
         return baos.toByteSequence();
     }
 
+    @Override
     public Object unmarshal(ByteSequence packet) throws IOException {
         ByteArrayInputStream stream = new ByteArrayInputStream(packet);
         DataInputStream dis = new DataInputStream(stream);
         return unmarshal(dis);
     }
 
+    @Override
     public void marshal(Object command, DataOutput dataOut) throws IOException {
         MQTTFrame frame = (MQTTFrame) command;
         dataOut.write(frame.header());
@@ -74,6 +78,7 @@ public class MQTTWireFormat implements WireFormat {
         }
     }
 
+    @Override
     public Object unmarshal(DataInput dataIn) throws IOException {
         byte header = dataIn.readByte();
 
@@ -107,6 +112,7 @@ public class MQTTWireFormat implements WireFormat {
     /**
      * @param the version of the wire format
      */
+    @Override
     public void setVersion(int version) {
         this.version = version;
     }
@@ -114,6 +120,7 @@ public class MQTTWireFormat implements WireFormat {
     /**
      * @return the version of the wire format
      */
+    @Override
     public int getVersion() {
         return this.version;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b7131ff/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
new file mode 100644
index 0000000..f2920e7
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that connection attempts that don't send a CONNECT frame will
+ * get cleaned up by the inactivity monitor.
+ */
+@RunWith(Parameterized.class)
+public class MQTTConnectTest extends MQTTTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTConnectTest.class);
+
+    private Socket connection;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"mqtt", false},
+                {"mqtt+ssl", true},
+                {"mqtt+nio", false},
+                {"mqtt+nio+ssl", true}
+            });
+    }
+
+    public MQTTConnectTest(String connectorScheme, boolean useSSL) {
+        super(connectorScheme, useSSL);
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Throwable e) {}
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    @Override
+    public String getProtocolConfig() {
+        return "transport.connectAttemptTimeout=2000";
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testInactivityMonitor() throws Exception {
+
+        Thread t1 = new Thread() {
+
+            @Override
+            public void run() {
+                try {
+                    connection = createConnection();
+                    connection.getOutputStream().write(0);
+                    connection.getOutputStream().flush();
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        t1.start();
+
+        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
+             }
+         }));
+
+        // and it should be closed due to inactivity
+        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
+            }
+        }));
+
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    protected Socket createConnection() throws IOException {
+        if (isUseSSL()) {
+            return SSLSocketFactory.getDefault().createSocket("localhost", port);
+        } else {
+            return new Socket("localhost", port);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b7131ff/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
index 19aac52..4bf554a 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
@@ -212,6 +212,7 @@ public class MQTTTestSupport {
         StringBuilder connectorURI = new StringBuilder();
         connectorURI.append(getProtocolScheme());
         connectorURI.append("://0.0.0.0:").append(port);
+        String protocolConfig = getProtocolConfig();
         if (protocolConfig != null && !protocolConfig.isEmpty()) {
             connectorURI.append("?").append(protocolConfig);
         }
@@ -291,6 +292,14 @@ public class MQTTTestSupport {
         this.protocolScheme = scheme;
     }
 
+    public String getProtocolConfig() {
+        return protocolConfig;
+    }
+
+    public void setProtocolConfig(String config) {
+        this.protocolConfig = config;
+    }
+
     public boolean isUseSSL() {
         return this.useSSL;
     }


Mime
View raw message