activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-1081 Implementing AMQP UndeliverableHere
Date Wed, 29 Mar 2017 01:32:51 GMT
ARTEMIS-1081 Implementing AMQP UndeliverableHere


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1f4473e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1f4473e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1f4473e8

Branch: refs/heads/master
Commit: 1f4473e8d77ccc724a16921b5e1207b565ab7c4c
Parents: 746220e
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Mar 28 17:59:41 2017 -0400
Committer: Justin Bertram <jbertram@apache.org>
Committed: Tue Mar 28 20:32:28 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  7 +++++++
 .../protocol/amqp/broker/AMQPMessage.java       | 22 ++++++++++++++++++++
 .../amqp/proton/ProtonServerSenderContext.java  | 10 +++++++--
 .../activemq/artemis/core/server/Consumer.java  |  3 +++
 .../core/server/cluster/impl/BridgeImpl.java    |  9 ++++++++
 .../core/server/cluster/impl/Redistributor.java |  9 ++++++++
 .../core/server/impl/ServerConsumerImpl.java    | 14 +++++++++++++
 .../integration/cli/DummyServerConsumer.java    |  5 +++++
 .../core/server/impl/fakes/FakeConsumer.java    |  5 +++++
 9 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 9cd3fa7..856e865 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -251,6 +251,13 @@ public interface Message {
    /** It will generate a new instance of the message encode, being a deep copy, new properties,
new everything */
    Message copy(long newID);
 
+   default boolean acceptsConsumer(long uniqueConsumerID) {
+      return true;
+   }
+
+   default void rejectConsumer(long uniqueConsumerID) {
+   }
+
    /**
     * Returns the messageID.
     * <br>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d241958..08953a2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -81,6 +81,8 @@ public class AMQPMessage extends RefCountMessage {
    private long scheduledTime = -1;
    private String connectionID;
 
+   Set<Object> rejectedConsumers;
+
    public AMQPMessage(long messageFormat, byte[] data) {
       this.data = Unpooled.wrappedBuffer(data);
       this.messageFormat = messageFormat;
@@ -323,6 +325,26 @@ public class AMQPMessage extends RefCountMessage {
       return AMQPMessagePersister.getInstance();
    }
 
+   @Override
+   public synchronized boolean acceptsConsumer(long consumer) {
+
+      if (rejectedConsumers == null) {
+         return true;
+      } else {
+         return !rejectedConsumers.contains(consumer);
+      }
+   }
+
+   @Override
+   public synchronized void rejectConsumer(long consumer) {
+      if (rejectedConsumers == null) {
+         rejectedConsumers = new HashSet<>();
+      }
+
+      rejectedConsumers.add(consumer);
+   }
+
+
    private synchronized void partialDecode(ByteBuffer buffer) {
       DecoderImpl decoder = TLSEncode.getDecoder();
       decoder.setByteBuffer(buffer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fb540a8..69d156b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -78,7 +79,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
    private static final Symbol SHARED = Symbol.valueOf("shared");
    private static final Symbol GLOBAL = Symbol.valueOf("global");
 
-   private Object brokerConsumer;
+   private Consumer brokerConsumer;
 
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
@@ -391,7 +392,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
 
       boolean browseOnly = !multicast && source.getDistributionMode() != null &&
source.getDistributionMode().equals(COPY);
       try {
-         brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector,
browseOnly);
+         brokerConsumer = (Consumer)sessionSPI.createSender(this, queue, multicast ? null
: selector, browseOnly);
       } catch (ActiveMQAMQPResourceLimitExceededException e1) {
          throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
       } catch (Exception e) {
@@ -553,6 +554,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements
Pr
             } else if (remoteState instanceof Modified) {
                try {
                   Modified modification = (Modified) remoteState;
+
+                  if (Boolean.TRUE.equals(modification.getUndeliverableHere())) {
+                     message.rejectConsumer(((Consumer)brokerConsumer).sequentialID());
+                  }
+
                   if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
                      sessionSPI.cancel(brokerConsumer, message, true);
                   } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 58c7d81..50c0b01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -68,4 +68,7 @@ public interface Consumer {
     * disconnect the consumer
     */
    void disconnect();
+
+   /** an unique sequential ID for this consumer */
+   long sequentialID();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index fe43532..c1a0ccc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -86,6 +86,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private final UUID nodeUUID;
 
+   private final long sequentialID;
+
    private final SimpleString name;
 
    private final Queue queue;
@@ -170,6 +172,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                      final String password,
                      final StorageManager storageManager) {
 
+      this.sequentialID = storageManager.generateID();
+
       this.reconnectAttempts = reconnectAttempts;
 
       this.reconnectAttemptsInUse = initialConnectAttempts;
@@ -245,6 +249,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    @Override
+   public long sequentialID() {
+      return sequentialID;
+   }
+
+   @Override
    public synchronized void start() throws Exception {
       if (started) {
          return;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 26399dc..eff8d67 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -52,6 +52,8 @@ public class Redistributor implements Consumer {
 
    private int count;
 
+   private final long sequentialID;
+
    // a Flush executor here is happening inside another executor.
    // what may cause issues under load. Say you are running out of executors for cases where
you don't need to wait at all.
    // So, instead of using a future we will use a plain ReusableLatch here
@@ -64,6 +66,8 @@ public class Redistributor implements Consumer {
                         final int batchSize) {
       this.queue = queue;
 
+      this.sequentialID = storageManager.generateID();
+
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
@@ -74,6 +78,11 @@ public class Redistributor implements Consumer {
    }
 
    @Override
+   public long sequentialID() {
+      return sequentialID;
+   }
+
+   @Override
    public Filter getFilter() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 3552b93..9e33602 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -77,6 +77,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
    private final long id;
 
+   private final long sequentialID;
+
    protected final Queue messageQueue;
 
    private final Filter filter;
@@ -180,6 +182,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
                              final ActiveMQServer server) throws Exception {
       this.id = id;
 
+      this.sequentialID = server.getStorageManager().generateID();
+
       this.filter = filter;
 
       this.session = session;
@@ -232,6 +236,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
    // ServerConsumer implementation
    // ----------------------------------------------------------------------
 
+
+   @Override
+   public long sequentialID() {
+      return sequentialID;
+   }
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -343,6 +353,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
          }
          final Message message = ref.getMessage();
 
+         if (!message.acceptsConsumer(sequentialID())) {
+            return HandleStatus.NO_MATCH;
+         }
+
          if (filter != null && !filter.match(message)) {
             if (logger.isTraceEnabled()) {
                logger.trace("Reference " + ref + " is a noMatch on consumer " + this);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 78b3e09..968c31b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -59,6 +59,11 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
+   public long sequentialID() {
+      return 0;
+   }
+
+   @Override
    public Object getProtocolContext() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1f4473e8/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 665686c..1db8347 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -82,6 +82,11 @@ public class FakeConsumer implements Consumer {
       delayCountdown = numReferences;
    }
 
+   @Override
+   public long sequentialID() {
+      return 0;
+   }
+
    public synchronized List<MessageReference> getReferences() {
       return references;
    }


Mime
View raw message