activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-611 refactor STOMP cxn TTL + heart-beat
Date Tue, 19 Jul 2016 15:06:20 GMT
ARTEMIS-611 refactor STOMP cxn TTL + heart-beat

Adds 3 new URI properties for STOMP acceptors to allow finer grained
configuration of heart-beat / connection-TTL behavior.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/89e0c461
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/89e0c461
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/89e0c461

Branch: refs/heads/master
Commit: 89e0c461e5128768513943e60f6e3992f066f529
Parents: dc76e2a
Author: jbertram <jbertram@apache.org>
Authored: Fri Jul 15 08:36:24 2016 -0500
Committer: jbertram <jbertram@apache.org>
Committed: Mon Jul 18 17:10:05 2016 -0500

----------------------------------------------------------------------
 .../remoting/impl/netty/TransportConstants.java |   9 +
 .../core/protocol/stomp/StompConnection.java    |   6 +-
 .../protocol/stomp/StompProtocolManager.java    |   2 +-
 .../stomp/v11/StompFrameHandlerV11.java         | 130 +++++------
 .../core/remoting/server/RemotingService.java   |   6 +
 .../server/impl/RemotingServiceImpl.java        | 130 +++++++----
 .../en/protocols-interoperability.md            |  65 ++++--
 .../integration/stomp/StompOverHttpTest.java    |   4 +-
 .../stomp/StompOverWebsocketTest.java           |   4 +-
 .../tests/integration/stomp/StompTest.java      |  18 ++
 .../tests/integration/stomp/StompTestBase.java  |  88 ++++++--
 .../integration/stomp/v11/StompV11Test.java     | 214 +++++++++++++++++++
 .../integration/stomp/v11/StompV11TestBase.java |   2 +-
 13 files changed, 504 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index f373639..53dc204 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -187,6 +187,12 @@ public class TransportConstants {
 
    public static final String CONNECTION_TTL = "connectionTtl";
 
+   public static final String CONNECTION_TTL_MAX = "connectionTtlMax";
+
+   public static final String CONNECTION_TTL_MIN = "connectionTtlMin";
+
+   public static final String HEART_BEAT_TO_CONNECTION_TTL_MODIFIER = "heartBeatToConnectionTtlModifier";
+
    public static final String STOMP_ENABLE_MESSAGE_ID = "stomp-enable-message-id";
 
    public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size";
@@ -230,6 +236,9 @@ public class TransportConstants {
       allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
       allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
       allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
+      allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX);
+      allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN);
+      allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
       allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
       allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
       allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 1cfd0a5..5396c5b 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -269,7 +269,7 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   Acceptor getAcceptorUsed() {
+   public Acceptor getAcceptorUsed() {
       return acceptorUsed;
    }
 
@@ -720,4 +720,8 @@ public final class StompConnection implements RemotingConnection {
       return minLargeMessageSize;
    }
 
+   public StompProtocolManager getManager() {
+      return manager;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 7642e69..2c8751c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -55,7 +55,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
 /**
  * StompProtocolManager
  */
-class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
+public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 7f284dd..0eb1951 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -29,7 +29,10 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompDecoder;
 import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
 import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.utils.CertificateUtil;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -92,7 +95,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
                   response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0");
                }
                else {
-                  response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.getServerHeartBeatValue());
+                  response.addHeader(Stomp.Headers.Connected.HEART_BEAT, Long.toString(heartBeater.serverPingPeriod) + "," + Long.toString(heartBeater.clientPingResponse));
                }
             }
          }
@@ -231,7 +234,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
    }
 
    private void startHeartBeat() {
-      if (heartBeater != null) {
+      if (heartBeater != null && heartBeater.serverPingPeriod != 0) {
          heartBeater.start();
       }
    }
@@ -242,31 +245,50 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
       return frame;
    }
 
-   //server heart beat
-   //algorithm:
-   //(a) server ping: if server hasn't sent any frame within serverPing
-   //interval, send a ping.
-   //(b) accept ping: if server hasn't received any frame within
-   // 2*serverAcceptPing, disconnect!
+   /*
+    * HeartBeater functions:
+    * (a) server ping: if server hasn't sent any frame within serverPingPeriod interval, send a ping
+    * (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread
+    *     can deal with closing connections which go stale
+    */
    private class HeartBeater extends Thread {
 
       private static final int MIN_SERVER_PING = 500;
-      private static final int MIN_CLIENT_PING = 500;
 
-      long serverPing = 0;
-      long serverAcceptPing = 0;
+      long serverPingPeriod = 0;
+      long clientPingResponse;
       volatile boolean shutdown = false;
-      AtomicLong lastPingTime = new AtomicLong(0);
-      AtomicLong lastAccepted = new AtomicLong(0);
-      StompFrame pingFrame;
+      AtomicLong lastPingTimestamp = new AtomicLong(0);
+      ConnectionEntry connectionEntry;
 
-      private HeartBeater(long clientPing, long clientAcceptPing) {
-         if (clientPing != 0) {
-            serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING;
+      private HeartBeater(final long clientPing, final long clientAcceptPing) {
+         connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID());
+         clientPingResponse = clientPing;
+
+         String ttlMaxStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MAX);
+         long ttlMax = ttlMaxStr == null ? Long.MAX_VALUE : Long.valueOf(ttlMaxStr);
+
+         String ttlMinStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MIN);
+         long ttlMin = ttlMinStr == null ? 500 : Long.valueOf(ttlMinStr);
+
+         String heartBeatToTtlModifierStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
+         double heartBeatToTtlModifier = heartBeatToTtlModifierStr == null ? 2 : Double.valueOf(heartBeatToTtlModifierStr);
+
+         // The connection's TTL should be clientPing * 2, MIN_CLIENT_PING, or ttlMax set on the acceptor
+         long connectionTtl = (long) (clientPing * heartBeatToTtlModifier);
+         if (connectionTtl < ttlMin) {
+            connectionTtl = ttlMin;
+            clientPingResponse = (long) (ttlMin / heartBeatToTtlModifier);
          }
+         else if (connectionTtl > ttlMax) {
+            connectionTtl = ttlMax;
+            clientPingResponse = (long) (ttlMax / heartBeatToTtlModifier);
+         }
+         ActiveMQServerLogger.LOGGER.info("Setting TTL to: " + connectionTtl);
+         connectionEntry.ttl = connectionTtl;
 
          if (clientAcceptPing != 0) {
-            serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
+            serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
          }
       }
 
@@ -275,85 +297,32 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
          this.notify();
       }
 
-      public String getServerHeartBeatValue() {
-         return String.valueOf(serverPing) + "," + String.valueOf(serverAcceptPing);
-      }
-
       public void pinged() {
-         lastPingTime.set(System.currentTimeMillis());
+         lastPingTimestamp.set(System.currentTimeMillis());
       }
 
       @Override
       public void run() {
-         lastAccepted.set(System.currentTimeMillis());
-         pingFrame = createPingFrame();
-
          synchronized (this) {
             while (!shutdown) {
-               long dur1 = 0;
-               long dur2 = 0;
-
-               if (serverPing != 0) {
-                  dur1 = System.currentTimeMillis() - lastPingTime.get();
-                  if (dur1 >= serverPing) {
-                     lastPingTime.set(System.currentTimeMillis());
-                     connection.ping(pingFrame);
-                     dur1 = 0;
-                  }
+               long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get();
+               if (lastPingPeriod >= serverPingPeriod) {
+                  lastPingTimestamp.set(System.currentTimeMillis());
+                  connection.ping(createPingFrame());
+                  lastPingPeriod = 0;
                }
-
-               if (serverAcceptPing != 0) {
-                  dur2 = System.currentTimeMillis() - lastAccepted.get();
-
-                  if (dur2 > (2 * serverAcceptPing)) {
-                     connection.disconnect(false);
-                     shutdown = true;
-                     break;
-                  }
-               }
-
-               long waitTime1 = 0;
-               long waitTime2 = 0;
-
-               if (serverPing > 0) {
-                  waitTime1 = serverPing - dur1;
-               }
-
-               if (serverAcceptPing > 0) {
-                  waitTime2 = serverAcceptPing * 2 - dur2;
-               }
-
-               long waitTime = 10L;
-
-               if ((waitTime1 > 0) && (waitTime2 > 0)) {
-                  waitTime = Math.min(waitTime1, waitTime2);
-               }
-               else if (waitTime1 > 0) {
-                  waitTime = waitTime1;
-               }
-               else if (waitTime2 > 0) {
-                  waitTime = waitTime2;
-               }
-
                try {
-                  this.wait(waitTime);
+                  this.wait(serverPingPeriod - lastPingPeriod);
                }
                catch (InterruptedException e) {
                }
             }
          }
       }
-
-      public void pingAccepted() {
-         this.lastAccepted.set(System.currentTimeMillis());
-      }
    }
 
    @Override
    public void requestAccepted(StompFrame request) {
-      if (heartBeater != null) {
-         heartBeater.pingAccepted();
-      }
    }
 
    @Override
@@ -403,10 +372,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
          // either "[\r]\n"s or "\n"s)
          while (true) {
             if (workingBuffer[offset] == NEW_LINE) {
-               if (heartBeater != null) {
-                  //client ping
-                  heartBeater.pingAccepted();
-               }
+               //client ping
                nextChar = false;
             }
             else if (workingBuffer[offset] == CR) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index 061e5a6..ea3107c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -99,4 +100,9 @@ public interface RemotingService {
     */
    Acceptor getAcceptor(String name);
 
+   Acceptor createAcceptor(String name, String uri) throws Exception;
+
+   Acceptor createAcceptor(TransportConfiguration transportConfiguration);
+
+   void destroyAcceptor(String name) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 3a073e9..1a8e32b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
+import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -77,8 +78,6 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
 
    private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class);
 
-   public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
-
    // Attributes ----------------------------------------------------
 
    private volatile boolean started = false;
@@ -119,6 +118,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
 
    private AtomicLong totalConnectionCount = new AtomicLong(0);
 
+   private long connectionTtlCheckInterval;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -163,6 +164,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       if (protocolManagerFactories != null) {
          loadProtocolManagerFactories(protocolManagerFactories);
       }
+
+      this.connectionTtlCheckInterval = config.getConnectionTtlCheckInterval();
    }
 
    private void setInterceptors(Configuration configuration) {
@@ -198,67 +201,94 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       threadPool = Executors.newCachedThreadPool(tFactory);
 
       for (TransportConfiguration info : acceptorsConfig) {
-         try {
-            AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName());
+         createAcceptor(info);
+      }
 
-            Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<>();
+      /**
+       * Don't start the acceptors here.  Only start the acceptors at the every end of the start-up process to avoid
+       * race conditions. See {@link #startAcceptors()}.
+       */
 
-            @SuppressWarnings("deprecation")
-            String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams());
-            if (protocol != null) {
-               ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol();
-               locateProtocols(protocol, info, selectedProtocolFactories);
-            }
+      // This thread checks connections that need to be closed, and also flushes confirmations
+      failureCheckAndFlushThread = new FailureCheckAndFlushThread(connectionTtlCheckInterval);
 
-            String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams());
+      failureCheckAndFlushThread.start();
 
-            if (protocols != null) {
-               locateProtocols(protocols, info, selectedProtocolFactories);
-            }
+      started = true;
+   }
 
-            ClusterConnection clusterConnection = lookupClusterConnection(info);
+   @Override
+   public Acceptor createAcceptor(String name, String uri) throws Exception {
+      AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
 
-            // If empty: we get the default list
-            if (selectedProtocolFactories.isEmpty()) {
-               selectedProtocolFactories = protocolMap;
-            }
+      List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name);
 
-            Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
-            for (Map.Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) {
-               selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
-            }
+      return createAcceptor(configurations.get(0));
+   }
 
+   @Override
+   public Acceptor createAcceptor(TransportConfiguration info) {
+      Acceptor acceptor = null;
 
-            Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
+      try {
+         AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName());
 
-            if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) {
-               acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
-            }
+         Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<>();
 
-            acceptors.put(info.getName(), acceptor);
+         @SuppressWarnings("deprecation")
+         String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams());
+         if (protocol != null) {
+            ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol();
+            locateProtocols(protocol, info, selectedProtocolFactories);
+         }
 
-            if (managementService != null) {
-               acceptor.setNotificationService(managementService);
+         String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams());
 
-               managementService.registerAcceptor(acceptor, info);
-            }
+         if (protocols != null) {
+            locateProtocols(protocols, info, selectedProtocolFactories);
          }
-         catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName());
+
+         ClusterConnection clusterConnection = lookupClusterConnection(info);
+
+         // If empty: we get the default list
+         if (selectedProtocolFactories.isEmpty()) {
+            selectedProtocolFactories = protocolMap;
          }
-      }
 
-      /**
-       * Don't start the acceptors here.  Only start the acceptors at the every end of the start-up process to avoid
-       * race conditions. See {@link #startAcceptors()}.
-       */
+         Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>();
+         for (Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) {
+            selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
+         }
 
-      // This thread checks connections that need to be closed, and also flushes confirmations
-      failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
 
-      failureCheckAndFlushThread.start();
+         acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
 
-      started = true;
+         if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) {
+            acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
+         }
+
+         acceptors.put(info.getName(), acceptor);
+
+         if (managementService != null) {
+            acceptor.setNotificationService(managementService);
+
+            managementService.registerAcceptor(acceptor, info);
+         }
+      }
+      catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName());
+      }
+
+      return acceptor;
+   }
+
+   @Override
+   public void destroyAcceptor(String name) throws Exception {
+      Acceptor acceptor = acceptors.get(name);
+      if (acceptor != null) {
+         acceptor.stop();
+         acceptors.remove(name);
+      }
    }
 
    @Override
@@ -423,6 +453,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       }
    }
 
+   public ConnectionEntry getConnectionEntry(final Object remotingConnectionID) {
+      ConnectionEntry entry = connections.get(remotingConnectionID);
+
+      if (entry != null) {
+         return entry;
+      }
+      else {
+         return null;
+      }
+   }
+
    @Override
    public RemotingConnection removeConnection(final Object remotingConnectionID) {
       ConnectionEntry entry = connections.remove(remotingConnectionID);
@@ -647,6 +688,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       @Override
       public void run() {
          while (!closed) {
+            ActiveMQServerLogger.LOGGER.info("Checking...");
             try {
                long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md
index 5c19311..5b486d0 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -256,15 +256,6 @@ set).
 Apache ActiveMQ Artemis currently doesn't support virtual hosting, which means the
 'host' header in CONNECT fram will be ignored.
 
-#### Heart-beating
-
-Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat
-intervals. The minimum interval for both client and server heartbeats is
-500 milliseconds. That means if a client sends a CONNECT frame with
-heartbeat values lower than 500, the server will defaults the value to
-500 milliseconds regardless the values of the 'heart-beat' header in the
-frame.
-
 ### Mapping Stomp destinations to Apache ActiveMQ Artemis addresses and queues
 
 Stomp clients deals with *destinations* when sending messages and
@@ -278,7 +269,14 @@ specified destination is mapped to an address. When a Stomp client
 subscribes (or unsubscribes) for a destination (using a `SUBSCRIBE` or
 `UNSUBSCRIBE` frame), the destination is mapped to an Apache ActiveMQ Artemis queue.
 
-### STOMP and connection-ttl
+### STOMP heart-beating and connection-ttl
+
+Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat
+intervals. The minimum interval for both client and server heartbeats is
+500 milliseconds. That means if a client sends a CONNECT frame with
+heartbeat values lower than 500, the server will defaults the value to
+500 milliseconds regardless the values of the 'heart-beat' header in the
+frame.
 
 Well behaved STOMP clients will always send a DISCONNECT frame before
 closing their connections. In this case the server will clear up any
@@ -288,19 +286,50 @@ they crash the server will have no way of knowing immediately whether
 the client is still alive or not. STOMP connections therefore default to
 a connection-ttl value of 1 minute (see chapter on
 [connection-ttl](#connection-ttl) for more information. This value can
-be overridden using connection-ttl-override.
-
-If you need a specific connectionTtl for your stomp connections without
-affecting the connectionTtlOverride setting, you can configure your
-stomp acceptor with the "connectionTtl" property, which is used to set
-the ttl for connections that are created from that acceptor. For
-example:
+be overridden using the `connection-ttl-override` property or if you 
+need a specific connectionTtl for your stomp connections without
+affecting the broker-wide `connection-ttl-override` setting, you can 
+configure your stomp acceptor with the "connectionTtl" property, which 
+is used to set the ttl for connections that are created from that acceptor.
+For example:
 
     <acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP;connectionTtl=20000</acceptor>
 
 The above configuration will make sure that any stomp connection that is
 created from that acceptor will have its connection-ttl set to 20
-seconds.
+seconds. The `connectionTtl` set on an acceptor will take precedence over
+`connection-ttl-override`.
+
+Since Stomp 1.0 doesn't support heart-beating then all connections from
+Stomp 1.0 clients will have a connection TTL imposed upon them by the broker
+based on the aforementioned configuration options. Likewise, any Stomp 1.1
+or 1.2 clients that don't specify a heart-beat or disable heart-beating
+(e.g. by sending `0,0` in the `heart-beat` header) will have a connection
+TTL imposed upon them by the broker.
+
+For Stomp 1.1 and 1.2 clients which send a valid `heart-beat` header then
+their connection TTL will be set accordingly. However, the broker will not
+set the connection TTL to the same value as the specified in the `heart-beat`
+since even small network delays could then cause spurious disconnects. Instead,
+the value in the heart-beat will be multiplied by the `heartBeatConnectionTtlModifer`
+specified on the acceptor. The `heartBeatConnectionTtlModifer` is a decimal 
+value that defaults to 2.0 so for example, if a client sends a `heart-beat`
+frame of `1000,0` the the connection TTL will be set to `2000` so that the
+ping frames sent every 1000 milliseconds will have a sufficient cushion so as
+not to be considered late and trigger a disconnect.
+
+The minimum and maximum connection TTL allowed can also be specified on the
+acceptor via the `connectionTtlMin` and `connectionTtlMax` properties respectively.
+The default `connectionTtlMin` is 500 and the default `connectionTtlMax` is Java's
+`Long.MAX_VALUE` meaning there essentially is no max connection TTL by default.
+Keep in mind that the `heartBeatConnectionTtlModifer` is relevant here. For
+example, if a client sends a `heart-beat` header of `20000,0` and the acceptor
+is using a `connectionTtlMax` of `30000` and a default `heartBeatConnectionTtlModifer`
+of `2.0` then the connection TTL would be `40000` (i.e. `20000` * `2.0`) which would
+exceed the `connectionTtlMax`. In this case the server would respond to the client
+with a `heart-beat` header of `0,15000` (i.e. `30000` / `2.0`). As described
+previously, this is to make sure there is a sufficient cushion for the client
+heart-beats. The same kind of calculation is done for `connectionTtlMin`.
 
 > **Note**
 >

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
index 8e588f9..783f35f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
@@ -37,13 +37,13 @@ import io.netty.handler.codec.string.StringEncoder;
 public class StompOverHttpTest extends StompTest {
 
    @Override
-   protected void addChannelHandlers(SocketChannel ch) {
+   protected void addChannelHandlers(int index, SocketChannel ch) {
       ch.pipeline().addLast(new HttpRequestEncoder());
       ch.pipeline().addLast(new HttpResponseDecoder());
       ch.pipeline().addLast(new HttpHandler());
       ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
       ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast(new StompClientHandler());
+      ch.pipeline().addLast(new StompClientHandler(index));
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
index 39cd4f7..fa8c048 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
@@ -63,12 +63,12 @@ public class StompOverWebsocketTest extends StompTest {
    }
 
    @Override
-   protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException {
+   protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
       ch.pipeline().addLast("http-codec", new HttpClientCodec());
       ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
       ch.pipeline().addLast(new WebsocketHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:8080/websocket"), WebSocketVersion.V13, null, false, null)));
       ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast(new StompClientHandler());
+      ch.pipeline().addLast(new StompClientHandler(index));
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 8155898..f28f5b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -52,6 +52,24 @@ public class StompTest extends StompTestBase {
    private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
 
    @Test
+   public void testConnectionTTL() throws Exception {
+      int index = 1;
+      int port = 61614;
+
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start();
+      createBootstrap(index, port);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(index, frame);
+      frame = receiveFrame(index, 10000);
+
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      Thread.sleep(5000);
+
+      assertChannelClosed(index);
+   }
+
+   @Test
    public void testSendManyMessages() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 9baf123..7f73e48 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -29,8 +29,10 @@ import java.io.IOException;
 import java.net.Socket;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -97,13 +99,15 @@ public abstract class StompTestBase extends ActiveMQTestBase {
 
    protected boolean autoCreateServer = true;
 
-   private Bootstrap bootstrap;
+   private List<Bootstrap> bootstraps = new ArrayList<>();
 
-   private Channel channel;
+//   private Channel channel;
 
-   private BlockingQueue<String> priorityQueue;
+   private List<BlockingQueue<String>> priorityQueues = new ArrayList<>();
 
-   private EventLoopGroup group;
+   private List<EventLoopGroup> groups = new ArrayList<>();
+
+   private List<Channel> channels = new ArrayList<>();
 
    // Implementation methods
    // -------------------------------------------------------------------------
@@ -111,7 +115,6 @@ public abstract class StompTestBase extends ActiveMQTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      priorityQueue = new ArrayBlockingQueue<>(1000);
       if (autoCreateServer) {
          server = createServer();
          addServer(server.getActiveMQServer());
@@ -133,18 +136,27 @@ public abstract class StompTestBase extends ActiveMQTestBase {
    }
 
    private void createBootstrap() {
-      group = new NioEventLoopGroup();
-      bootstrap = new Bootstrap();
-      bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
+      createBootstrap(0, port);
+   }
+
+   protected void createBootstrap(int port) {
+      createBootstrap(0, port);
+   }
+
+   protected void createBootstrap(final int index, int port) {
+      priorityQueues.add(index, new ArrayBlockingQueue<String>(1000));
+      groups.add(index, new NioEventLoopGroup());
+      bootstraps.add(index, new Bootstrap());
+      bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
-            addChannelHandlers(ch);
+            addChannelHandlers(index, ch);
          }
       });
 
       // Start the client.
       try {
-         channel = bootstrap.connect("localhost", port).sync().channel();
+         channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel());
          handshake();
       }
       catch (InterruptedException e) {
@@ -156,10 +168,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
    protected void handshake() throws InterruptedException {
    }
 
-   protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException {
+   protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
       ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
       ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast(new StompClientHandler());
+      ch.pipeline().addLast(new StompClientHandler(index));
    }
 
    protected void setUpAfterServer() throws Exception {
@@ -224,9 +236,13 @@ public abstract class StompTestBase extends ActiveMQTestBase {
       if (autoCreateServer) {
          connection.close();
 
-         if (group != null) {
-            channel.close();
-            group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
+         for (EventLoopGroup group : groups) {
+            if (group != null) {
+               for (Channel channel : channels) {
+                  channel.close();
+               }
+               group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
+            }
          }
       }
       super.tearDown();
@@ -234,8 +250,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
 
    protected void cleanUp() throws Exception {
       connection.close();
-      if (group != null) {
-         group.shutdown();
+      if (groups.get(0) != null) {
+         groups.get(0).shutdown();
       }
    }
 
@@ -244,7 +260,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
    }
 
    protected void reconnect(long sleep) throws Exception {
-      group.shutdown();
+      groups.get(0).shutdown();
 
       if (sleep > 0) {
          Thread.sleep(sleep);
@@ -278,22 +294,38 @@ public abstract class StompTestBase extends ActiveMQTestBase {
    }
 
    protected void assertChannelClosed() throws InterruptedException {
-      boolean closed = channel.closeFuture().await(5000);
+      assertChannelClosed(0);
+   }
+
+   protected void assertChannelClosed(int index) throws InterruptedException {
+      boolean closed = channels.get(index).closeFuture().await(5000);
       assertTrue("channel not closed", closed);
    }
 
    public void sendFrame(String data) throws Exception {
-      channel.writeAndFlush(data);
+      sendFrame(0, data);
+   }
+
+   public void sendFrame(int index, String data) throws Exception {
+      channels.get(index).writeAndFlush(data);
    }
 
    public void sendFrame(byte[] data) throws Exception {
+      sendFrame(0, data);
+   }
+
+   public void sendFrame(int index, byte[] data) throws Exception {
       ByteBuf buffer = Unpooled.buffer(data.length);
       buffer.writeBytes(data);
-      channel.writeAndFlush(buffer);
+      channels.get(index).writeAndFlush(buffer);
    }
 
    public String receiveFrame(long timeOut) throws Exception {
-      String msg = priorityQueue.poll(timeOut, TimeUnit.MILLISECONDS);
+      return receiveFrame(0, timeOut);
+   }
+
+   public String receiveFrame(int index, long timeOut) throws Exception {
+      String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS);
       return msg;
    }
 
@@ -344,6 +376,11 @@ public abstract class StompTestBase extends ActiveMQTestBase {
    }
 
    class StompClientHandler extends SimpleChannelInboundHandler<String> {
+      int index = 0;
+
+      StompClientHandler(int index) {
+         this.index = index;
+      }
 
       StringBuffer currentMessage = new StringBuffer("");
 
@@ -356,7 +393,12 @@ public abstract class StompTestBase extends ActiveMQTestBase {
             String actualMessage = fullMessage.substring(0, messageEnd);
             fullMessage = fullMessage.substring(messageEnd + 2);
             currentMessage = new StringBuffer("");
-            priorityQueue.add(actualMessage);
+            BlockingQueue queue = priorityQueues.get(index);
+            if (queue == null) {
+               queue = new ArrayBlockingQueue(1000);
+               priorityQueues.add(index, queue);
+            }
+            queue.add(actualMessage);
             if (fullMessage.length() > 0) {
                channelRead(ctx, fullMessage);
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 8e6fbb4..dfcd1b9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -741,6 +741,220 @@ public class StompV11Test extends StompV11TestBase {
    }
 
    @Test
+   public void testHeartBeatToTTL() throws Exception {
+      ClientStompFrame frame;
+      ClientStompFrame reply;
+      int port = 61614;
+
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000&connectionTtlMin=5000&connectionTtlMax=10000").start();
+      StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+
+      //no heart beat at all if heat-beat absent
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+
+      reply = connection.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      Thread.sleep(3000);
+
+      assertEquals(0, connection.getFrameQueueSize());
+
+      try {
+         connection.disconnect();
+         fail("Channel should be closed here already due to TTL");
+      }
+      catch (Exception e) {
+         // ignore
+      }
+
+      //no heart beat for (0,0)
+      connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "0,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      reply = connection.sendFrame(frame);
+
+      IntegrationTestLogger.LOGGER.info("Reply: " + reply);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("0,0", reply.getHeader("heart-beat"));
+
+      Thread.sleep(3000);
+
+      assertEquals(0, connection.getFrameQueueSize());
+
+      try {
+         connection.disconnect();
+         fail("Channel should be closed here already due to TTL");
+      }
+      catch (Exception e) {
+         // ignore
+      }
+
+      //heart-beat (1,0), should receive a min client ping accepted by server
+      connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      reply = connection.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("0,2500", reply.getHeader("heart-beat"));
+
+      Thread.sleep(7000);
+
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connection.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will fail
+      try {
+         connection.sendFrame(frame);
+         fail("connection should have been destroyed by now");
+      }
+      catch (IOException e) {
+         //ignore
+      }
+
+      //heart-beat (1,0), start a ping, then send a message, should be ok.
+      connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      reply = connection.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("0,2500", reply.getHeader("heart-beat"));
+
+      System.out.println("========== start pinger!");
+
+      connection.startPinger(2500);
+
+      Thread.sleep(7000);
+
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connection.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will be ok
+      connection.sendFrame(frame);
+
+      connection.stopPinger();
+
+      connection.disconnect();
+
+      //heart-beat (20000,0), should receive a max client ping accepted by server
+      connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "20000,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      reply = connection.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("0,5000", reply.getHeader("heart-beat"));
+
+      Thread.sleep(12000);
+
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connection.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will fail
+      try {
+         connection.sendFrame(frame);
+         fail("connection should have been destroyed by now");
+      }
+      catch (IOException e) {
+         //ignore
+      }
+   }
+
+   @Test
+   public void testHeartBeatToConnectionTTLModifier() throws Exception {
+      ClientStompFrame frame;
+      ClientStompFrame reply;
+      StompClientConnection connection;
+      int port = 61614;
+
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start();
+
+      connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "5000,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      reply = connection.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("0,5000", reply.getHeader("heart-beat"));
+
+      Thread.sleep(6000);
+
+      try {
+         connection.disconnect();
+         fail("Connection should be closed here already due to TTL");
+      }
+      catch (Exception e) {
+         // ignore
+      }
+
+      server.getActiveMQServer().getRemotingService().destroyAcceptor("test");
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start();
+
+      connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
+      frame = connection.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "5000,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      reply = connection.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("0,5000", reply.getHeader("heart-beat"));
+
+      Thread.sleep(6000);
+
+      connection.disconnect();
+   }
+
+   @Test
    public void testNack() throws Exception {
       connV11.connect(defUser, defPass);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
index d37d90b..f737455 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
@@ -103,7 +103,7 @@ public abstract class StompV11TestBase extends ActiveMQTestBase {
       params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
 
-      Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())).setConnectionTtlCheckInterval(500);
 
       ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
 


Mime
View raw message