activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-468 Amendments to how redelivery count is handled on openwire
Date Thu, 07 Apr 2016 18:00:27 GMT
ARTEMIS-468 Amendments to how redelivery count is handled on openwire


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/50eac7c8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/50eac7c8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/50eac7c8

Branch: refs/heads/master
Commit: 50eac7c824e586aa858fb1f56676feffd4be7523
Parents: 8a998ad
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Apr 7 12:12:16 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Apr 7 13:56:31 2016 -0400

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |  5 +++++
 .../core/protocol/mqtt/MQTTSessionCallback.java |  6 +++++
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../openwire/OpenWireMessageConverter.java      | 16 +++++---------
 .../core/protocol/openwire/amq/AMQConsumer.java | 22 ++++++-------------
 .../core/protocol/openwire/amq/AMQSession.java  | 13 ++++++++++-
 .../core/protocol/stomp/StompSession.java       |  5 +++++
 .../protocol/core/impl/CoreSessionCallback.java |  5 +++++
 .../artemis/core/server/ConsumerListener.java   | 23 --------------------
 .../core/server/impl/ServerConsumerImpl.java    | 21 +++++++++---------
 .../spi/core/protocol/SessionCallback.java      |  9 ++++++++
 .../integration/client/HangConsumerTest.java    |  5 +++++
 12 files changed, 71 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 55bade9..1c6ea01 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -344,6 +344,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference
ref, boolean failed) {
+      return false;
+   }
+
+   @Override
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 82b1ed6..57cb7fe 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -54,6 +54,12 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference
ref, boolean failed) {
+      return false;
+   }
+
+
+   @Override
    public int sendLargeMessageContinuation(ServerConsumer consumerID,
                                            byte[] body,
                                            boolean continues,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3ccb98d..818d305 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
             throw new IllegalStateException("Session not exist! : " + sessionId);
          }
 
-         List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession,
new SlowConsumerDetection());
+         List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection());
 
          this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
          ss.addConsumer(info);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 4516253..cfe5b47 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
    private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
    private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
-   public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
    private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
 
    private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter {
          }
       }
 
-      coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter());
       ActiveMQDestination replyTo = messageSend.getReplyTo();
       if (replyTo != null) {
          ByteSequence replyToBytes = marshaller.marshal(replyTo);
@@ -445,15 +444,15 @@ public class OpenWireMessageConverter implements MessageConverter {
       }
    }
 
-   public static MessageDispatch createMessageDispatch(ServerMessage message,
+   public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage
message,
                                                        AMQConsumer consumer) throws IOException,
JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(),
consumer.getOpenwireDestination());
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());
-      md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
+      md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
       md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
       md.setMessage(amqMessage);
       ActiveMQDestination destination = amqMessage.getDestination();
@@ -462,7 +461,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       return md;
    }
 
-   private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller,
ActiveMQDestination actualDestination) throws IOException {
+   private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage,
WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
       ActiveMQMessage amqMsg = null;
       byte coreType = coreMessage.getType();
       switch (coreType) {
@@ -762,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter {
          amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
       }
 
-      Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER);
-      if (redeliveryCounter != null) {
-         amqMsg.setRedeliveryCounter(redeliveryCounter);
-      }
+      amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1);
 
       byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
       if (replyToBytes != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 81cdec8..01820d6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -44,10 +43,9 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.wireformat.WireFormat;
 
-public class AMQConsumer implements ConsumerListener {
+public class AMQConsumer {
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
@@ -186,7 +184,7 @@ public class AMQConsumer implements ConsumerListener {
             return 0;
          }
 
-         dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
+         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
@@ -307,19 +305,13 @@ public class AMQConsumer implements ConsumerListener {
       }
    }
 
-   @Override
-   public void updateForCanceledRef(MessageReference ref) {
+   public void updateDeliveryCountAfterCancel(MessageReference ref) {
       long seqId = ref.getMessage().getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
-      ServerMessage coreMessage = ref.getMessage();
-      int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
-      if (openwireDestination.isTopic()) {
-         redeliveryCounter++;
-         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
-      }
-      else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId)
{
-         redeliveryCounter++;
-         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+
+      // This is a specific rule of the protocol
+      if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
+         ref.decrementDeliveryCount();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 74dd951..84354cd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback {
 
    }
 
+   @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference
ref, boolean failed) {
+      if (consumer.getProtocolData() != null) {
+         ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
+         return true;
+      }
+      else {
+         return false;
+      }
+
+   }
+
    public List<AMQConsumer> createConsumer(ConsumerInfo info,
-                                           AMQSession amqSession,
                                            SlowConsumerDetectionListener slowConsumerDetectionListener)
throws Exception {
       //check destination
       ActiveMQDestination dest = info.getDestination();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 9b5c70d..8db5720 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -119,6 +119,11 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference
ref, boolean failed) {
+      return false;
+   }
+
+   @Override
    public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer
consumer, int deliveryCount) {
       LargeServerMessageImpl largeMessage = null;
       ServerMessage newServerMessage = serverMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 9d6125b..f4d69d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -57,6 +57,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference
ref, boolean failed) {
+      return false;
+   }
+
+   @Override
    public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer
consumer, long bodySize, int deliveryCount) {
       Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize,
deliveryCount);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
deleted file mode 100644
index 2b2be9c..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-/**
- */
-public interface ConsumerListener {
-   void updateForCanceledRef(MessageReference ref);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 12ca54c..5fb6018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -367,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
             ref.incrementDeliveryCount();
 
             // If updateDeliveries = false (set by strict-update),
-            // the updateDeliveryCount would still be updated after c
+            // the updateDeliveryCountAfterCancel would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged()) {
                if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
                   !ref.getQueue().isInternalQueue() &&
@@ -596,15 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
    }
 
    protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed)
{
-      if (!failed) {
-         // We don't decrement delivery count if the client failed, since there's a possibility
that refs
-         // were actually delivered but we just didn't get any acks for them
-         // before failure
-         ref.decrementDeliveryCount();
-      }
-
-      if (this.protocolData instanceof ConsumerListener) {
-         ((ConsumerListener)protocolData).updateForCanceledRef(ref);
+      // We first update the deliveryCount at the protocol callback...
+      // if that wasn't updated (if there is no specific logic, then we apply the default
logic used on most protocols
+      if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) {
+         if (!failed) {
+            // We don't decrement delivery count if the client failed, since there's a possibility
that refs
+            // were actually delivered but we just didn't get any acks for them
+            // before failure
+            ref.decrementDeliveryCount();
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index cf0ec69..9f23f80 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -33,6 +33,15 @@ public interface SessionCallback {
     *  like acks or other operations. */
    void afterDelivery() throws Exception;
 
+   /**
+    * Use this to updates specifics on the message after a redelivery happened.
+    * Return true if there was specific logic applied on the protocol, so the ServerConsumer
won't make any adjustments.
+    * @param consumer
+    * @param ref
+    * @param failed
+    */
+   boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref,
boolean failed);
+
    void sendProducerCreditsMessage(int credits, SimpleString address);
 
    void sendProducerCreditsFailMessage(int credits, SimpleString address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index a3bae65..2c88896 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference
ref, boolean failed) {
+         return false;
+      }
+
+      @Override
       public void browserFinished(ServerConsumer consumer) {
 
       }


Mime
View raw message