activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests
Date Thu, 07 Apr 2016 18:00:26 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master cf4636e96 -> c4a092c1c


ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8a998ad8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8a998ad8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8a998ad8

Branch: refs/heads/master
Commit: 8a998ad805e88ef2bf4cdb01c19c1d0ba1217a30
Parents: cf4636e
Author: Howard Gao <howard.gao@gmail.com>
Authored: Wed Apr 6 20:59:57 2016 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Apr 7 12:12:28 2016 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      |  8 ++++--
 .../core/protocol/openwire/amq/AMQConsumer.java | 30 ++++++++++++++++----
 .../artemis/core/server/ConsumerListener.java   | 23 +++++++++++++++
 .../core/server/impl/ServerConsumerImpl.java    |  6 ++++
 .../core/server/impl/ServerSessionImpl.java     |  1 +
 5 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 53464cc..4516253 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
    private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
    private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
-   private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
+   public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
    private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
 
    private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    public static MessageDispatch createMessageDispatch(ServerMessage message,
-                                                       int deliveryCount,
                                                        AMQConsumer consumer) throws IOException,
JMSException {
       ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
+      //we can use core message id for sequenceId
+      amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());
+      md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
+      md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
       md.setMessage(amqMessage);
-      md.setRedeliveryCounter(deliveryCount);
       ActiveMQDestination destination = amqMessage.getDestination();
       md.setDestination(destination);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3093ed8..81cdec8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.wireformat.WireFormat;
 
-public class AMQConsumer {
+public class AMQConsumer implements ConsumerListener {
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
@@ -108,7 +110,6 @@ public class AMQConsumer {
       }
 
       serverConsumer.setProtocolData(this);
-
    }
 
    private SimpleString createTopicSubscription(boolean isDurable,
@@ -184,8 +185,8 @@ public class AMQConsumer {
          if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message))
{
             return 0;
          }
-         //decrement deliveryCount as AMQ client tends to add 1.
-         dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount
- 1, this);
+
+         dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
@@ -215,7 +216,6 @@ public class AMQConsumer {
     *  Notice that we will start a new transaction on the cases where there is no transaction.
*/
    public void acknowledge(MessageAck ack) throws Exception {
 
-
       MessageId first = ack.getFirstMessageId();
       MessageId last = ack.getLastMessageId();
 
@@ -252,6 +252,10 @@ public class AMQConsumer {
          }
          else if (ack.isPoisonAck()) {
             for (MessageReference ref : ackList) {
+               Throwable poisonCause = ack.getPoisonCause();
+               if (poisonCause != null) {
+                  ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
+               }
                ref.getQueue().sendToDeadLetterAddress(transaction, ref);
             }
          }
@@ -303,6 +307,22 @@ public class AMQConsumer {
       }
    }
 
+   @Override
+   public void updateForCanceledRef(MessageReference ref) {
+      long seqId = ref.getMessage().getMessageID();
+      long lastDelSeqId = info.getLastDeliveredSequenceId();
+      ServerMessage coreMessage = ref.getMessage();
+      int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
+      if (openwireDestination.isTopic()) {
+         redeliveryCounter++;
+         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+      }
+      else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId)
{
+         redeliveryCounter++;
+         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+      }
+   }
+
    /**
     * The MessagePullHandler is used with slow consumer policies.
     * */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
new file mode 100644
index 0000000..2b2be9c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/**
+ */
+public interface ConsumerListener {
+   void updateForCanceledRef(MessageReference ref);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7860ed8..12ca54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties;
  * Concrete implementation of a ClientConsumer.
  */
 public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
+   //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
    // Constants ------------------------------------------------------------------------------------
 
    private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
          // before failure
          ref.decrementDeliveryCount();
       }
+
+      if (this.protocolData instanceof ConsumerListener) {
+         ((ConsumerListener)protocolData).updateForCanceledRef(ref);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 31102aa..7a817f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
 
    protected void doClose(final boolean failed) throws Exception {
       synchronized (this) {
+         this.setStarted(false);
          if (closed)
             return;
 


Mime
View raw message