activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: More improvements for AMQ-5043. Reworked the MQTT inactivity monitor so that it's more accurate.
Date Wed, 12 Feb 2014 18:26:13 GMT
Updated Branches:
  refs/heads/trunk e2a7d6af5 -> 6e68a3711


More improvements for AMQ-5043.  Reworked the MQTT inactivity monitor so that it's more accurate.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6e68a371
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6e68a371
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6e68a371

Branch: refs/heads/trunk
Commit: 6e68a3711561ed24bd78603453c81c5bbaf99f24
Parents: e2a7d6a
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Wed Feb 12 13:26:16 2014 -0500
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Wed Feb 12 13:26:16 2014 -0500

----------------------------------------------------------------------
 .../transport/mqtt/MQTTInactivityMonitor.java   | 104 +++++++------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  21 ++--
 2 files changed, 49 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6e68a371/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
index c2f3041..adaf38b 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.thread.SchedulerTimerTask;
 import org.apache.activemq.transport.AbstractInactivityMonitor;
 import org.apache.activemq.transport.InactivityIOException;
@@ -50,46 +49,53 @@ public class MQTTInactivityMonitor extends TransportFilter {
 
     private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
     private final AtomicBoolean failed = new AtomicBoolean(false);
-    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
     private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
 
     private final ReentrantLock sendLock = new ReentrantLock();
     private SchedulerTimerTask readCheckerTask;
 
-    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
-    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
+    private long readGraceTime = DEFAULT_CHECK_TIME_MILLS;
+    private long readKeepAliveTime = DEFAULT_CHECK_TIME_MILLS;
     private boolean keepAliveResponseRequired;
     private MQTTProtocolConverter protocolConverter;
 
     private final Runnable readChecker = new Runnable() {
-        long lastRunTime;
+        long lastReceiveTime = System.currentTimeMillis();
 
         public void run() {
-            long now = System.currentTimeMillis();
-            long elapsed = (now - lastRunTime);
-
-            if (lastRunTime != 0 && LOG.isDebugEnabled()) {
-                LOG.debug("" + elapsed + " ms elapsed since last read check.");
-            }
 
-            // Perhaps the timer executed a read check late.. and then executes
-            // the next read check on time which causes the time elapsed between
-            // read checks to be small..
-
-            // If less than 90% of the read check Time elapsed then abort this readcheck.
-            if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline
this expression.
-                LOG.debug("Aborting read check.. Not enough time elapsed since last read
check.");
+            long now = System.currentTimeMillis();
+            int currentCounter = next.getReceiveCounter();
+            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
+
+            // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter,
and that
+            // should be sufficient to indicate the connection is still alive. If there were
random data, or something
+            // outside the scope of the spec, the wire format unrmarshalling would fail,
so we don't need to handle
+            // PINGREQ/RESP explicitly here
+            if (inReceive.get() || currentCounter != previousCounter) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Command received since last read check.");
+                }
+                lastReceiveTime = now;
                 return;
             }
 
-            lastRunTime = now;
-            readCheck();
+            if( (now-lastReceiveTime) >= readKeepAliveTime+readGraceTime && monitorStarted.get()
&& !ASYNC_TASKS.isTerminating()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString()
+ "! Throwing InactivityIOException.");
+                }
+                ASYNC_TASKS.execute(new Runnable() {
+                    public void run() {
+                        onException(new InactivityIOException("Channel was inactive for too
(>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress()));
+                    }
+                });
+            }
         }
     };
 
     private boolean allowReadCheck(long elapsed) {
-        return elapsed > (readCheckTime * 9 / 10);
+        return elapsed > (readGraceTime * 9 / 10);
     }
 
     public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
@@ -106,39 +112,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
         next.stop();
     }
 
-    final void readCheck() {
-        int currentCounter = next.getReceiveCounter();
-        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
-
-        // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter,
and that
-        // should be sufficient to indicate the connection is still alive. If there were
random data, or something
-        // outside the scope of the spec, the wire format unrmarshalling would fail, so we
don't need to handle
-        // PINGREQ/RESP explicitly here
-        if (inReceive.get() || currentCounter != previousCounter) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("A receive is in progress");
-            }
-            return;
-        }
-        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating())
{
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("No message received since last read check for " + toString() +
"! Throwing InactivityIOException.");
-            }
-            ASYNC_TASKS.execute(new Runnable() {
-                public void run() {
-                    onException(new InactivityIOException("Channel was inactive for too (>"
+ readCheckTime + ") long: " + next.getRemoteAddress()));
-                }
-            });
-        } else {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Message received since last read check, resetting flag: ");
-            }
-        }
-        commandReceived.set(false);
-    }
-
     public void onCommand(Object command) {
-        commandReceived.set(true);
         inReceive.set(true);
         try {
             transportListener.onCommand(command);
@@ -177,20 +151,20 @@ public class MQTTInactivityMonitor extends TransportFilter {
         }
     }
 
-    public long getReadCheckTime() {
-        return readCheckTime;
+    public long getReadGraceTime() {
+        return readGraceTime;
     }
 
-    public void setReadCheckTime(long readCheckTime) {
-        this.readCheckTime = readCheckTime;
+    public void setReadGraceTime(long readGraceTime) {
+        this.readGraceTime = readGraceTime;
     }
 
-    public long getInitialDelayTime() {
-        return initialDelayTime;
+    public long getReadKeepAliveTime() {
+        return readKeepAliveTime;
     }
 
-    public void setInitialDelayTime(long initialDelayTime) {
-        this.initialDelayTime = initialDelayTime;
+    public void setReadKeepAliveTime(long readKeepAliveTime) {
+        this.readKeepAliveTime = readKeepAliveTime;
     }
 
     public boolean isKeepAliveResponseRequired() {
@@ -224,11 +198,11 @@ public class MQTTInactivityMonitor extends TransportFilter {
             return;
         }
 
-        if (readCheckTime > 0) {
+        if (readKeepAliveTime > 0) {
             readCheckerTask = new SchedulerTimerTask(readChecker);
         }
 
-        if (readCheckTime > 0) {
+        if (readKeepAliveTime > 0) {
             monitorStarted.set(true);
             synchronized (AbstractInactivityMonitor.class) {
                 if (CHECKER_COUNTER == 0) {
@@ -236,8 +210,8 @@ public class MQTTInactivityMonitor extends TransportFilter {
                     READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
                 }
                 CHECKER_COUNTER++;
-                if (readCheckTime > 0) {
-                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
+                if (readKeepAliveTime > 0) {
+                    READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6e68a371/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index f7c3c1e..d19a0b3 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -30,7 +30,6 @@ import javax.jms.Message;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.*;
 import org.apache.activemq.store.PersistenceAdapterSupport;
-import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -51,7 +50,7 @@ public class MQTTProtocolConverter {
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
-    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
+    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
     private static final int DEFAULT_CACHE_SIZE = 5000;
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
@@ -609,24 +608,24 @@ public class MQTTProtocolConverter {
         }
 
         try {
-
-            long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
-
             // if we have a default keep-alive value, and the client is trying to turn off
keep-alive,
+
             // we'll observe the server-side configured default value (note, no grace period)
-            if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) {
-                keepAliveMSWithGracePeriod = defaultKeepAlive;
+            if (keepAliveMS == 0 && defaultKeepAlive > 0) {
+                keepAliveMS = defaultKeepAlive;
             }
 
+            long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
+
             monitor.setProtocolConverter(this);
-            monitor.setReadCheckTime(keepAliveMSWithGracePeriod);
-            monitor.setInitialDelayTime(keepAliveMS);
+            monitor.setReadKeepAliveTime(keepAliveMS);
+            monitor.setReadGraceTime(readGracePeriod);
             monitor.startMonitorThread();
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("MQTT Client " + getClientId() +
-                        " established heart beat of  " + keepAliveMSWithGracePeriod +
-                        " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS)
+
+                        " established heart beat of  " + keepAliveMS +
+                        " ms (" + keepAliveMS + "ms + " + readGracePeriod +
                         "ms grace period)");
             }
         } catch (Exception ex) {


Mime
View raw message