activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1315 Client disconnection may cause consumer to hang
Date Wed, 02 Aug 2017 16:48:50 GMT
ARTEMIS-1315 Client disconnection may cause consumer to hang

When calling a consumer to receive message with a timeout
(receive(long timeout), if the consumer's buffer is empty, it sends
a 'forced delivery' the waiting forever, expecting the server to
send back a 'forced delivery" message if the queue is empty.

If the connection is disconnected as the arrived 'forced
delivery' message is corrupted, this 'forced delivery' message
never gets to consumer. After the session is reconnected,
the consumer never knows that and stays waiting.

To fix that we can send a 'forced delivery' to server right
after the session is reconnected.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/613b459c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/613b459c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/613b459c

Branch: refs/heads/master
Commit: 613b459c521427058bc1a3af8155c695fcb0df9f
Parents: c3f8321
Author: Howard Gao <howard.gao@gmail.com>
Authored: Wed Aug 2 09:49:54 2017 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Aug 2 12:48:43 2017 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientConsumerImpl.java    | 12 +++-
 .../client/impl/ClientConsumerInternal.java     |  2 +
 .../core/client/impl/ClientSessionImpl.java     |  2 +-
 .../core/impl/ActiveMQConsumerContext.java      |  1 +
 .../core/impl/ActiveMQSessionContext.java       | 15 ++++-
 .../spi/core/remoting/ConsumerContext.java      |  1 +
 .../spi/core/remoting/SessionContext.java       |  2 +-
 .../DisconnectOnCriticalFailureTest.java        | 61 ++++++++++++++++++++
 .../client/impl/LargeMessageBufferTest.java     |  5 ++
 9 files changed, 95 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 4b48caa..ccaa004 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -123,7 +124,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
{
 
    private boolean stopped = false;
 
-   private long forceDeliveryCount;
+   private AtomicLong forceDeliveryCount = new AtomicLong(0);
 
    private final ClientSession.QueueQuery queueInfo;
 
@@ -295,7 +296,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
{
                   logger.trace(this + "::Forcing delivery");
                }
                // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed
dead locks
-               sessionContext.forceDelivery(this, forceDeliveryCount++);
+               sessionContext.forceDelivery(this, forceDeliveryCount.getAndIncrement());
                callForceDelivery = false;
                deliveryForced = true;
                continue;
@@ -309,7 +310,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
{
 
                   // Need to check if forceDelivery was called at this call
                   // As we could be receiving a message that came from a previous call
-                  if (forcingDelivery && deliveryForced && seq == forceDeliveryCount
- 1) {
+                  if (forcingDelivery && deliveryForced && seq == forceDeliveryCount.get()
- 1) {
                      // forced delivery messages are discarded, nothing has been delivered
by the queue
                      resetIfSlowConsumer();
 
@@ -539,6 +540,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal
{
    }
 
    @Override
+   public long getForceDeliveryCount() {
+      return forceDeliveryCount.get();
+   }
+
+   @Override
    public SimpleString getFilterString() {
       return filterString;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
index cc5f3f1..177732e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
@@ -68,4 +68,6 @@ public interface ClientConsumerInternal extends ClientConsumer {
    void start();
 
    ClientSession.QueueQuery getQueueInfo();
+
+   long getForceDeliveryCount();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index f4b80cd..5f6b40b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1242,7 +1242,7 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
 
                      ClientConsumerInternal consumerInternal = entryx.getValue();
 
-                     sessionContext.recreateConsumerOnServer(consumerInternal);
+                     sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(),
started);
                   }
 
                   if ((!autoCommitAcks || !autoCommitSends) && workDone) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
index 65540ee..5af0e5e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
@@ -26,6 +26,7 @@ public class ActiveMQConsumerContext extends ConsumerContext {
       this.id = id;
    }
 
+   @Override
    public long getId() {
       return id;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index b8eb22c..fc43672 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -690,7 +690,7 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException
{
+   public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal, long consumerId,
boolean isSessionStarted) throws ActiveMQException {
       ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
 
       // We try and recreate any non durable queues, since they probably won't be there unless
@@ -717,6 +717,19 @@ public class ActiveMQSessionContext extends SessionContext {
          SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
1);
          sendPacketWithoutLock(sessionChannel, packet);
       }
+
+      //force a delivery to avoid a infinite waiting
+      //it can happen when the consumer sends a 'forced delivery' then
+      //waiting forever, while the connection is broken and the server's
+      //'forced delivery' message never gets to consumer. If session
+      //is reconnected, its consumer never knows and stays waiting.
+      //note this message will either be ignored by consumer (forceDeliveryCount
+      //doesn't match, which is fine) or be caught by consumer
+      //(in which case the consumer will wake up, thus avoid the infinite waiting).
+      if (isSessionStarted && consumerInternal.getForceDeliveryCount() > 0) {
+         SessionForceConsumerDelivery forceDel = new SessionForceConsumerDelivery(consumerId,
consumerInternal.getForceDeliveryCount() - 1);
+         sendPacketWithoutLock(sessionChannel, forceDel);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java
index f2c70cf..cd50cc5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConsumerContext.java
@@ -18,4 +18,5 @@ package org.apache.activemq.artemis.spi.core.remoting;
 
 public abstract class ConsumerContext {
 
+   public abstract long getId();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index fe3feb2..b123960 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -286,7 +286,7 @@ public abstract class SessionContext {
                                         boolean autoCommitAcks,
                                         boolean preAcknowledge) throws ActiveMQException;
 
-   public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal)
throws ActiveMQException;
+   public abstract void recreateConsumerOnServer(ClientConsumerInternal consumerInternal,
long consumerId, boolean isSessionStarted) throws ActiveMQException;
 
    public abstract void xaFailed(Xid xid) throws ActiveMQException;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
index fea3bf4..2be7d67 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
@@ -18,7 +18,9 @@
 package org.apache.activemq.artemis.tests.extras.byteman;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMRules;
@@ -30,6 +32,7 @@ import org.junit.runner.RunWith;
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -127,6 +130,64 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
       }
    }
 
+   @Test(timeout = 60000)
+   @BMRules(
+      rules = {@BMRule(
+         name = "Corrupt Decoding",
+         targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
+         targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
+   public void testClientDisconnectLarge() throws Exception {
+      Queue q1 = createQueue("queue1");
+      final Connection connection = nettyCf.createConnection();
+      final CountDownLatch latch = new CountDownLatch(1);
+      ServerLocator locator = ((ActiveMQConnectionFactory)nettyCf).getServerLocator();
+      int minSize = locator.getMinLargeMessageSize();
+      StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < minSize; i++) {
+         builder.append("a");
+      }
+
+      try {
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException e) {
+               latch.countDown();
+            }
+         });
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(q1);
+         TextMessage m = session.createTextMessage(builder.toString());
+         producer.send(m);
+         connection.start();
+
+         corruptPacket.set(true);
+         MessageConsumer consumer = session.createConsumer(q1);
+         Message lm = consumer.receive(2000);
+
+         //first receive won't crash because the packet
+         //is SESS_RECEIVE_LARGE_MSG
+         assertNotNull(lm);
+
+         //second receive will force server to send a
+         //"forced delivery" message, and will cause
+         //the exception to be thrown.
+         lm = consumer.receive(5000);
+         assertNull(lm);
+
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+      } finally {
+         corruptPacket.set(false);
+
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
    public static void doThrow(ActiveMQBuffer buff) {
       byte type = buff.getByte(buff.readerIndex());
       if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/613b459c/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
index a0bc01c..d78f070 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
@@ -786,6 +786,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
          return null;
       }
 
+      @Override
+      public long getForceDeliveryCount() {
+         return 0;
+      }
+
       /* (non-Javadoc)
        * @see org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession()
        */


Mime
View raw message