Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DD96519AE6 for ; Thu, 7 Apr 2016 18:00:26 +0000 (UTC) Received: (qmail 4443 invoked by uid 500); 7 Apr 2016 18:00:26 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 4405 invoked by uid 500); 7 Apr 2016 18:00:26 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 4306 invoked by uid 99); 7 Apr 2016 18:00:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Apr 2016 18:00:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44C68E17A8; Thu, 7 Apr 2016 18:00:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 07 Apr 2016 18:00:26 -0000 Message-Id: <5458e7ba77174fd6be8753055db5d9cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] activemq-artemis git commit: ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests Repository: activemq-artemis Updated Branches: refs/heads/master cf4636e96 -> c4a092c1c ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8a998ad8 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8a998ad8 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8a998ad8 Branch: refs/heads/master Commit: 8a998ad805e88ef2bf4cdb01c19c1d0ba1217a30 Parents: cf4636e Author: Howard Gao Authored: Wed Apr 6 20:59:57 2016 +0800 Committer: Clebert Suconic Committed: Thu Apr 7 12:12:28 2016 -0400 ---------------------------------------------------------------------- .../openwire/OpenWireMessageConverter.java | 8 ++++-- .../core/protocol/openwire/amq/AMQConsumer.java | 30 ++++++++++++++++---- .../artemis/core/server/ConsumerListener.java | 23 +++++++++++++++ .../core/server/impl/ServerConsumerImpl.java | 6 ++++ .../core/server/impl/ServerSessionImpl.java | 1 + 5 files changed, 60 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 53464cc..4516253 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; + public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; @@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter { } public static MessageDispatch createMessageDispatch(ServerMessage message, - int deliveryCount, AMQConsumer consumer) throws IOException, JMSException { ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + //we can use core message id for sequenceId + amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); + md.setRedeliveryCounter(amqMessage.getRedeliveryCounter()); + md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setMessage(amqMessage); - md.setRedeliveryCounter(deliveryCount); ActiveMQDestination destination = amqMessage.getDestination(); md.setDestination(destination); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 3093ed8..81cdec8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.wireformat.WireFormat; -public class AMQConsumer { +public class AMQConsumer implements ConsumerListener { private AMQSession session; private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; @@ -108,7 +110,6 @@ public class AMQConsumer { } serverConsumer.setProtocolData(this); - } private SimpleString createTopicSubscription(boolean isDurable, @@ -184,8 +185,8 @@ public class AMQConsumer { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { return 0; } - //decrement deliveryCount as AMQ client tends to add 1. - dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); + + dispatch = OpenWireMessageConverter.createMessageDispatch(message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -215,7 +216,6 @@ public class AMQConsumer { * Notice that we will start a new transaction on the cases where there is no transaction. */ public void acknowledge(MessageAck ack) throws Exception { - MessageId first = ack.getFirstMessageId(); MessageId last = ack.getLastMessageId(); @@ -252,6 +252,10 @@ public class AMQConsumer { } else if (ack.isPoisonAck()) { for (MessageReference ref : ackList) { + Throwable poisonCause = ack.getPoisonCause(); + if (poisonCause != null) { + ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString()); + } ref.getQueue().sendToDeadLetterAddress(transaction, ref); } } @@ -303,6 +307,22 @@ public class AMQConsumer { } } + @Override + public void updateForCanceledRef(MessageReference ref) { + long seqId = ref.getMessage().getMessageID(); + long lastDelSeqId = info.getLastDeliveredSequenceId(); + ServerMessage coreMessage = ref.getMessage(); + int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER); + if (openwireDestination.isTopic()) { + redeliveryCounter++; + coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + } + else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) { + redeliveryCounter++; + coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + } + } + /** * The MessagePullHandler is used with slow consumer policies. * */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java new file mode 100644 index 0000000..2b2be9c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +/** + */ +public interface ConsumerListener { + void updateForCanceledRef(MessageReference ref); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 7860ed8..12ca54c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties; * Concrete implementation of a ClientConsumer. */ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { + //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log"); // Constants ------------------------------------------------------------------------------------ private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // before failure ref.decrementDeliveryCount(); } + + if (this.protocolData instanceof ConsumerListener) { + ((ConsumerListener)protocolData).updateForCanceledRef(ref); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 31102aa..7a817f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected void doClose(final boolean failed) throws Exception { synchronized (this) { + this.setStarted(false); if (closed) return;