activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [activemq-artemis] branch main updated: ARTEMIS-3234 - revisit fix to deal with credit on unmatched acks, thanks to brusdev for the interceptor feature and test from ARTEMIS-2650
Date Wed, 02 Jun 2021 15:23:55 GMT
This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 815f383  ARTEMIS-3234 - revisit fix to deal with credit on unmatched acks, thanks
to brusdev for the interceptor feature and test from ARTEMIS-2650
815f383 is described below

commit 815f383f9c19abacd0c56543ebc0e93e06dffbf9
Author: gtully <gary.tully@gmail.com>
AuthorDate: Wed Jun 2 00:09:02 2021 +0100

    ARTEMIS-3234 - revisit fix to deal with credit on unmatched acks, thanks to brusdev for
the interceptor feature and test from ARTEMIS-2650
---
 .../core/protocol/openwire/OpenWireConnection.java |  25 ++--
 .../protocol/openwire/OpenWireInterceptor.java     |  27 ++++
 .../protocol/openwire/OpenWireProtocolManager.java |  42 ++++--
 .../openwire/OpenWireProtocolManagerFactory.java   |  10 +-
 .../core/protocol/openwire/amq/AMQConsumer.java    |  39 +++---
 .../openwire/OpenWireProtocolManagerTest.java      |   2 +-
 .../openwire/interop/GeneralInteropTest.java       | 145 ++++++++++++++++++++-
 7 files changed, 233 insertions(+), 57 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 8af0731..1216a52 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -131,7 +131,6 @@ import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.state.ConsumerState;
-import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.util.ByteSequence;
@@ -257,11 +256,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       if (state == null) {
          return null;
       }
-      ConnectionInfo info = state.getInfo();
-      if (info == null) {
-         return null;
-      }
-      return info;
+      return state.getInfo();
    }
 
    //tells the connection that
@@ -311,6 +306,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
             AuditLogger.setRemoteAddress(getRemoteAddress());
          }
 
+         if (this.protocolManager.invokeIncoming(command, this) != null) {
+            logger.debugf("Interceptor rejected OpenWire command: %s", command);
+            disconnect(true);
+            return;
+         }
+
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
 
@@ -496,6 +497,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    }
 
    public void physicalSend(Command command) throws IOException {
+      if (this.protocolManager.invokeOutgoing(command, this) != null) {
+         return;
+      }
 
       if (logger.isTraceEnabled()) {
          tracePhysicalSend(transportConnection, command);
@@ -595,10 +599,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
             SessionState ss = state.getSessionState(id.getParentId());
             if (ss != null) {
                result.setProducerState(ss.getProducerState(id));
-               ProducerState producerState = ss.getProducerState(id);
-               if (producerState != null && producerState.getInfo() != null) {
-                  ProducerInfo info = producerState.getInfo();
-               }
             }
             producerExchanges.put(id, result);
          }
@@ -672,6 +672,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
          }
       }
+      if (fail) {
+         shutdown(fail);
+      }
    }
 
    @Override
@@ -808,8 +811,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
       try {
          physicalSend(command);
-      } catch (Exception e) {
-         return false;
       } catch (Throwable t) {
          return false;
       }
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireInterceptor.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireInterceptor.java
new file mode 100644
index 0000000..7acc302
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireInterceptor.java
@@ -0,0 +1,27 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.command.Command;
+
+public interface OpenWireInterceptor extends BaseInterceptor<Command> {
+
+}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 3aa5868..840a9a7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -35,7 +35,6 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
-import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
@@ -49,8 +48,8 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.impl.LRUCache;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -82,7 +81,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
 
 import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION;
 
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener
{
+public class OpenWireProtocolManager  extends AbstractProtocolManager<Command, OpenWireInterceptor,
OpenWireConnection> implements ClusterTopologyListener {
 
    private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
 
@@ -94,7 +93,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
 
    private final OpenWireProtocolManagerFactory factory;
 
-   private OpenWireFormatFactory wireFactory;
+   private final OpenWireFormatFactory wireFactory;
 
    private boolean prefixPacketSize = true;
 
@@ -135,7 +134,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
 
-   protected class VirtualTopicConfig {
+   private final List<OpenWireInterceptor> incomingInterceptors = new ArrayList<>();
+   private final List<OpenWireInterceptor> outgoingInterceptors = new ArrayList<>();
+
+
+   protected static class VirtualTopicConfig {
       public int filterPathTerminus;
       public boolean selectorAware;
 
@@ -160,7 +163,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
    private final Map<DestinationFilter, VirtualTopicConfig> vtConsumerDestinationMatchers
= new HashMap<>();
    protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache
= new LRUCache();
 
-   public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer
server) {
+   public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer
server,
+                                  List<BaseInterceptor> incomingInterceptors,
+                                  List<BaseInterceptor> outgoingInterceptors) {
       this.factory = factory;
       this.server = server;
       this.wireFactory = new OpenWireFormatFactory();
@@ -170,6 +175,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
       scheduledPool = server.getScheduledPool();
       this.wireFormat = (OpenWireFormat) wireFactory.createWireFormat();
 
+      updateInterceptors(incomingInterceptors, outgoingInterceptors);
+
       final ClusterManager clusterManager = this.server.getClusterManager();
 
       ClusterConnection cc = clusterManager.getDefaultConnection(null);
@@ -245,14 +252,29 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
    }
 
    @Override
-   public ProtocolManagerFactory<Interceptor> getFactory() {
+   public ProtocolManagerFactory getFactory() {
       return factory;
    }
 
    @Override
-   public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
-                                  List<BaseInterceptor> outgoingInterceptors) {
-      // NO-OP
+   public void updateInterceptors(List incoming, List outgoing) {
+      this.incomingInterceptors.clear();
+      if (incoming != null) {
+         this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+      }
+
+      this.outgoingInterceptors.clear();
+      if (outgoing != null) {
+         this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
+      }
+   }
+
+   public String invokeIncoming(Command command, OpenWireConnection connection) {
+      return super.invokeInterceptors(this.incomingInterceptors, command, connection);
+   }
+
+   public String invokeOutgoing(Command command, OpenWireConnection connection) {
+      return super.invokeInterceptors(this.outgoingInterceptors, command, connection);
    }
 
    @Override
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
index d40e2ef..7368737 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
@@ -16,12 +16,10 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
-import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -30,7 +28,7 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
 import org.osgi.service.component.annotations.Component;
 
 @Component(service = ProtocolManagerFactory.class)
-public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
{
+public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<OpenWireInterceptor>
{
 
    public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
 
@@ -44,12 +42,12 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto
                                                 final List<BaseInterceptor> incomingInterceptors,
                                                 List<BaseInterceptor> outgoingInterceptors)
throws Exception {
       BeanSupport.stripPasswords(parameters);
-      return BeanSupport.setData(new OpenWireProtocolManager(this, server), parameters);
+      return BeanSupport.setData(new OpenWireProtocolManager(this, server, incomingInterceptors,
outgoingInterceptors), parameters);
    }
 
    @Override
-   public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
{
-      return Collections.emptyList();
+   public List<OpenWireInterceptor> filterInterceptors(List<BaseInterceptor>
interceptors) {
+      return internalFilterInterceptors(OpenWireInterceptor.class, interceptors);
    }
 
    @Override
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index d68fa91..c06227e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -293,40 +293,33 @@ public class AMQConsumer {
       }
 
       final int ackMessageCount = ack.getMessageCount();
-      acquireCredit(ackMessageCount);
-
       if (ack.isDeliveredAck()) {
+         acquireCredit(ackMessageCount);
          deliveredAcksCreditExtension += ackMessageCount;
          // our work is done
          return;
       }
 
-      // some sort of real ack, rebalance deliveredAcksCreditExtension
-      if (deliveredAcksCreditExtension > 0) {
-         deliveredAcksCreditExtension -= ackMessageCount;
-         if (deliveredAcksCreditExtension >= 0) {
-            currentWindow.addAndGet(-ackMessageCount);
-         }
-      }
+      final MessageId lastID = ack.getLastMessageId();
+      final MessageId startID = ack.getFirstMessageId() == null ? lastID : ack.getFirstMessageId();
 
-      final MessageId startID, lastID;
+      // if it's browse only, nothing to be acked
+      final boolean removeReferences = !serverConsumer.isBrowseOnly() && !serverConsumer.getQueue().isNonDestructive();
+      final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences,
reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
 
-      if (ack.getFirstMessageId() == null) {
-         startID = ack.getLastMessageId();
-         lastID = ack.getLastMessageId();
-      } else {
-         startID = ack.getFirstMessageId();
-         lastID = ack.getLastMessageId();
-      }
+      if (!ackList.isEmpty() || !removeReferences || serverConsumer.getQueue().isTemporary())
{
 
-      boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only,
nothing to be acked, we just remove the lists
-      if (serverConsumer.getQueue().isNonDestructive()) {
-         removeReferences = false;
-      }
+         // valid match in delivered or browsing or temp - deal with credit
+         acquireCredit(ackMessageCount);
 
-      final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences,
reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
+         // some sort of real ack, rebalance deliveredAcksCreditExtension
+         if (deliveredAcksCreditExtension > 0) {
+            deliveredAcksCreditExtension -= ackMessageCount;
+            if (deliveredAcksCreditExtension >= 0) {
+               currentWindow.addAndGet(-ackMessageCount);
+            }
+         }
 
-      if (!ackList.isEmpty()) {
          if (ack.isExpiredAck()) {
             for (MessageReference ref : ackList) {
                ref.getQueue().expire(ref, serverConsumer);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
index e060cb3..4ee2a6c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
@@ -36,7 +36,7 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
 
    @Test
    public void testVtAutoConversion() {
-      underTest = new OpenWireProtocolManager(null, new DummyServer()) {
+      underTest = new OpenWireProtocolManager(null, new DummyServer(), null, null) {
          @Override
          public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination)
{
             if (lruCacheRef == null) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
index 28715b1..42ca410 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
@@ -27,20 +27,31 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
+import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQMessageProducer;
 import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireInterceptor;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportListener;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -51,13 +62,10 @@ import org.junit.Test;
  */
 public class GeneralInteropTest extends BasicOpenWireTest {
 
-   private ServerLocator locator;
-
    @Before
    @Override
    public void setUp() throws Exception {
       super.setUp();
-      locator = this.createInVMNonHALocator();
    }
 
    @Test
@@ -194,6 +202,103 @@ public class GeneralInteropTest extends BasicOpenWireTest {
 
    @Test
    public void testFailoverReceivingFromCore() throws Exception {
+
+      /**
+       * to get logging to stdout from failover client
+       *  org.slf4j.impl.SimpleLoggerFactory simpleLoggerFactory = new SimpleLoggerFactory();
+       * ((SimpleLogger)simpleLoggerFactory.getLogger(FailoverTransport.class.getName())).setLevel(SimpleLogger.TRACE);
+       */
+
+      final String text = "HelloWorld";
+      final int prefetchSize = 10;
+
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString dlq = new SimpleString("DLQ1");
+      server.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
+      server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDeadLetterAddress(dla));
+
+      sendMultipleTextMessagesUsingCoreJms(queueName, text, 100);
+
+      String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT
+         + ")?randomize=false&timeout=400&reconnectDelay=500" +
+         "&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500"
+
+         "&nested.wireFormat.maxInactivityDurationInitalDelay=500" +
+         "&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400";
+
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString);
+      connectionFactory.setSendAcksAsync(false);
+      connectionFactory.setOptimizeAcknowledge(false);
+      connectionFactory.getPrefetchPolicy().setAll(prefetchSize);
+
+      Connection connection = connectionFactory.createConnection();
+      try {
+         connection.setClientID("test.consumer.queue." + queueName);
+         connection.start();
+
+         Message message = null;
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         QueueControl queueControl = (QueueControl)server.getManagementService().
+            getResource(ResourceNames.QUEUE + queueName);
+
+         QueueControl dlqControl = (QueueControl)server.getManagementService().
+            getResource(ResourceNames.QUEUE + dlq.toString());
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         message = consumer.receive(5000);
+         assertNotNull(message);
+         assertTrue(message instanceof TextMessage);
+         assertEquals(text + 0, ((TextMessage)message).getText());
+         message.acknowledge();
+
+         Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
+         Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000,
100);
+
+         message = consumer.receive(5000);
+         assertNotNull(message);
+         assertTrue(message instanceof TextMessage);
+         assertEquals(text + 1, ((TextMessage)message).getText());
+
+         // client won't get a reply to the ack command, just a disconnect and will replay
the ack on reconnect
+         server.getRemotingService().addIncomingInterceptor(new OpenWireInterceptor() {
+            @Override
+            public boolean intercept(Command packet, RemotingConnection connection) throws
ActiveMQException {
+               if (packet.isMessageAck()) {
+                  server.getRemotingService().removeIncomingInterceptor(this);
+                  return false;
+               }
+               return true;
+            }
+         });
+
+         message.acknowledge();
+
+         // after a response to the replay....
+         // the message should be redelivered and pending for the replayed ack... hence it
gets acked ok.
+         // the real delivery gets suppressed as a duplicate by the message audit and poison
acked
+         // but there is a race between client failover reconnect and server dispatch to
a new consumer
+         // if redispatch has not happened, the replayed ack is dropped and the posion ack
will match and try and dlq
+         Wait.waitFor(() -> dlqControl.getMessageCount() == 1 && queueControl.getMessagesAcknowledged()
== 1
+            || dlqControl.getMessageCount() == 0 && queueControl.getMessagesAcknowledged()
== 2, 3000, 100);
+         Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000,
100);
+
+         message = consumer.receive(5000);
+         assertNotNull(message);
+         assertTrue(message instanceof TextMessage);
+         assertEquals(text + 2, ((TextMessage)message).getText());
+         message.acknowledge();
+
+         Wait.waitFor(() -> dlqControl.getMessageCount() == 1 && queueControl.getMessagesAcknowledged()
== 2
+            || dlqControl.getMessageCount() == 0 && queueControl.getMessagesAcknowledged()
== 3, 3000, 100);
+         Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 30000,
100);
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testFailoverReceivingFromCoreWithAckAfterInterrupt() throws Exception {
       final int prefetchSize = 10;
       final String text = "HelloWorld";
 
@@ -227,13 +332,43 @@ public class GeneralInteropTest extends BasicOpenWireTest {
          Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
          Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000,
100);
 
-         //Force a disconnection.
+         message = consumer.receive(5000);
+         assertNotNull(message);
+         assertTrue(message instanceof TextMessage);
+         assertEquals(text + 1, ((TextMessage)message).getText());
+
+         CountDownLatch interrupted = new CountDownLatch(1);
+         ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+            }
+
+            @Override
+            public void onException(IOException error) {
+            }
+
+            @Override
+            public void transportInterupted() {
+               interrupted.countDown();
+            }
+
+            @Override
+            public void transportResumed() {
+            }
+         });
+
+         //Force a disconnection that will result in duplicate ack
          for (ServerSession serverSession : server.getSessions()) {
             if (session.toString().contains(serverSession.getName())) {
                serverSession.getRemotingConnection().fail(new ActiveMQDisconnectedException());
             }
          }
 
+         assertTrue(interrupted.await(10, TimeUnit.SECONDS));
+
+         // ack will be dropped
+         message.acknowledge();
+
          Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
          Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000,
100);
 

Mime
View raw message