activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-934 Stomp Heart beat not being stopped in some cases
Date Thu, 26 Jan 2017 05:01:21 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 0e7fde72f -> a8a0c186d


ARTEMIS-934 Stomp Heart beat not being stopped in some cases

(cherry picked from commit f79b21e866539ca196eea67adc700c424f61fbfc)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a8a0c186
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a8a0c186
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a8a0c186

Branch: refs/heads/1.x
Commit: a8a0c186d15cb5e5d266fbb9fd7fcb4684e9be95
Parents: 0e7fde7
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jan 25 10:12:06 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Jan 26 00:01:10 2017 -0500

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    |  9 ++++
 .../stomp/VersionedStompFrameHandler.java       |  3 ++
 .../stomp/v11/StompFrameHandlerV11.java         | 15 ++++++-
 .../util/AbstractStompClientConnection.java     | 11 ++++-
 .../integration/stomp/v11/StompV11Test.java     | 46 ++++++++++++++++++++
 .../integration/stomp/v11/StompV11TestBase.java |  2 +-
 6 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8a0c186/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index ec010f7..899ffde 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -107,6 +107,10 @@ public final class StompConnection implements RemotingConnection {
       return false;
    }
 
+   public VersionedStompFrameHandler getStompVersionHandler() {
+      return frameHandler;
+   }
+
    public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
       StompFrame frame = null;
       try {
@@ -315,6 +319,11 @@ public final class StompConnection implements RemotingConnection {
       }
 
       ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+      if (frameHandler != null) {
+         frameHandler.disconnect();
+      }
+
       // Then call the listeners
       callFailureListeners(me);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8a0c186/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 003865c..ea71a2c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -42,6 +42,9 @@ public abstract class VersionedStompFrameHandler {
    protected final ScheduledExecutorService scheduledExecutorService;
    protected final ExecutorFactory executorFactory;
 
+   protected void disconnect() {
+   }
+
    public static VersionedStompFrameHandler getHandler(StompConnection connection,
                                                        StompVersions version,
                                                        ScheduledExecutorService scheduledExecutorService,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8a0c186/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 867cdd8..c6831cd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -57,6 +57,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
       decoder.init();
    }
 
+   public ActiveMQScheduledComponent getHeartBeater() {
+      return heartBeater;
+   }
+
    @Override
    public StompFrame onConnect(StompFrame frame) {
       StompFrame response = null;
@@ -131,15 +135,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler
implements
       //client receive ping
       long minAcceptInterval = Long.valueOf(params[1]);
 
-      heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(),
minPingInterval, minAcceptInterval);
+      if (heartBeater == null) {
+         heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(),
minPingInterval, minAcceptInterval);
+      }
    }
 
    @Override
    public StompFrame onDisconnect(StompFrame frame) {
+      disconnect();
+      return null;
+   }
+
+   @Override
+   protected void disconnect() {
       if (this.heartBeater != null) {
          heartBeater.shutdown();
       }
-      return null;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8a0c186/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index 56338e4..4d15bb8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -62,7 +62,8 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
    protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
 
    protected boolean connected = false;
-   private int serverPingCounter;
+   protected int serverPingCounter;
+   protected ReaderThread readerThread;
 
    public AbstractStompClientConnection(String version, String host, int port) throws IOException
{
       this.version = version;
@@ -85,7 +86,13 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       readBuffer = ByteBuffer.allocateDirect(10240);
       receiveList = new ArrayList<>(10240);
 
-      new ReaderThread().start();
+      readerThread = new ReaderThread();
+      readerThread.start();
+      //new ReaderThread().start();
+   }
+
+   public void killReaderThread() {
+      readerThread.stop();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8a0c186/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 2bd15a1..00ce729 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -32,12 +32,16 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
+import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -2468,6 +2472,48 @@ public class StompV11Test extends StompV11TestBase {
       unsubscribe(conn, subId, receipt, false);
    }
 
+
+   @Test
+   public void testHeartBeat3() throws Exception {
+
+      connection.close();
+      ClientStompFrame frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "500,500");
+      frame.addHeader("accept-version", "1.0,1.1");
+
+      ClientStompFrame reply = connV11.sendFrame(frame);
+
+      assertEquals("CONNECTED", reply.getCommand());
+
+      assertEquals("500,500", reply.getHeader("heart-beat"));
+
+
+      System.out.println("========== start pinger!");
+
+      connV11.startPinger(100);
+
+
+      Assert.assertEquals(1, server.getActiveMQServer().getRemotingService().getConnections().size());
+      StompConnection stompConnection = (StompConnection)server.getActiveMQServer().getRemotingService().getConnections().iterator().next();
+      StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11) stompConnection.getStompVersionHandler();
+
+      Thread.sleep(1000);
+
+      //now check the frame size
+      int size = connV11.getServerPingNumber();
+
+      connV11.stopPinger();
+      ((AbstractStompClientConnection)connV11).killReaderThread();
+      Wait.waitFor(() -> {
+         return server.getActiveMQServer().getRemotingService().getConnections().size() ==
0;
+      });
+
+      Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted());
+   }
+
    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect)
throws Exception {
       connV11.connect(defUser, defPass);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8a0c186/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
index c1bdccc..368c638 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java
@@ -58,7 +58,7 @@ public abstract class StompV11TestBase extends ActiveMQTestBase {
 
    private ConnectionFactory connectionFactory;
 
-   private Connection connection;
+   protected Connection connection;
 
    protected Session session;
 


Mime
View raw message