activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-934 Stomp Heart beat not being stopped in some cases
Date Thu, 26 Jan 2017 05:00:52 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 98f6fa760 -> fb4bc063f


ARTEMIS-934 Stomp Heart beat not being stopped in some cases


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f79b21e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f79b21e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f79b21e8

Branch: refs/heads/master
Commit: f79b21e866539ca196eea67adc700c424f61fbfc
Parents: 98f6fa7
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jan 25 10:12:06 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jan 25 12:17:47 2017 -0500

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    |  9 ++++
 .../stomp/VersionedStompFrameHandler.java       |  3 ++
 .../stomp/v11/StompFrameHandlerV11.java         | 15 ++++++-
 .../util/AbstractStompClientConnection.java     |  8 +++-
 .../integration/stomp/v11/StompV11Test.java     | 47 ++++++++++++++++++++
 5 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 0eb81b9..f72a73e 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -110,6 +110,10 @@ public final class StompConnection implements RemotingConnection {
       return false;
    }
 
+   public VersionedStompFrameHandler getStompVersionHandler() {
+      return frameHandler;
+   }
+
    public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
       StompFrame frame = null;
       try {
@@ -343,6 +347,11 @@ public final class StompConnection implements RemotingConnection {
       }
 
       ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+      if (frameHandler != null) {
+         frameHandler.disconnect();
+      }
+
       // Then call the listeners
       callFailureListeners(me);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 02facd6..673c86e 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -43,6 +43,9 @@ public abstract class VersionedStompFrameHandler {
    protected final ScheduledExecutorService scheduledExecutorService;
    protected final ExecutorFactory executorFactory;
 
+   protected void disconnect() {
+   }
+
    public static VersionedStompFrameHandler getHandler(StompConnection connection,
                                                        StompVersions version,
                                                        ScheduledExecutorService scheduledExecutorService,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 867cdd8..c6831cd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -57,6 +57,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
       decoder.init();
    }
 
+   public ActiveMQScheduledComponent getHeartBeater() {
+      return heartBeater;
+   }
+
    @Override
    public StompFrame onConnect(StompFrame frame) {
       StompFrame response = null;
@@ -131,15 +135,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler
implements
       //client receive ping
       long minAcceptInterval = Long.valueOf(params[1]);
 
-      heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(),
minPingInterval, minAcceptInterval);
+      if (heartBeater == null) {
+         heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(),
minPingInterval, minAcceptInterval);
+      }
    }
 
    @Override
    public StompFrame onDisconnect(StompFrame frame) {
+      disconnect();
+      return null;
+   }
+
+   @Override
+   protected void disconnect() {
       if (this.heartBeater != null) {
          heartBeater.shutdown();
       }
-      return null;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index d8a487e..fa1ec73 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -45,6 +45,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
    protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
    protected boolean connected = false;
    protected int serverPingCounter;
+   protected ReaderThread readerThread;
 
    public AbstractStompClientConnection(String version, String host, int port) throws IOException
{
       this.version = version;
@@ -67,7 +68,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       readBuffer = ByteBuffer.allocateDirect(10240);
       receiveList = new ArrayList<>(10240);
 
-      new ReaderThread().start();
+      readerThread = new ReaderThread();
+      readerThread.start();
+   }
+
+   public void killReaderThread() {
+      readerThread.stop();
    }
 
    private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws
IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 01f1cf8..eb055f1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -31,13 +31,17 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
+import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
+import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -2113,6 +2117,49 @@ public class StompV11Test extends StompTestBase {
       conn.disconnect();
    }
 
+
+   @Test
+   public void testHeartBeat3() throws Exception {
+
+      connection.close();
+      ClientStompFrame frame = conn.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "500,500");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      ClientStompFrame reply = conn.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("500,500", reply.getHeader("heart-beat"));
+
+
+      System.out.println("========== start pinger!");
+
+      conn.startPinger(100);
+
+
+      Assert.assertEquals(1, server.getActiveMQServer().getRemotingService().getConnections().size());
+      StompConnection stompConnection = (StompConnection)server.getActiveMQServer().getRemotingService().getConnections().iterator().next();
+      StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11)stompConnection.getStompVersionHandler();
+
+      Thread.sleep(1000);
+
+      //now check the frame size
+      int size = conn.getServerPingNumber();
+
+      conn.stopPinger();
+      ((AbstractStompClientConnection)conn).killReaderThread();
+      Wait.waitFor(() -> {
+         return server.getActiveMQServer().getRemotingService().getConnections().size() ==
0;
+      });
+
+      Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted());
+   }
+
+
    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect)
throws Exception {
       conn.connect(defUser, defPass);
 


Mime
View raw message