activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [01/50] [abbrv] activemq-nms-openwire git commit: https://issues.apache.org/jira/browse/AMQNET-489
Date Wed, 08 Mar 2017 23:12:03 GMT
Repository: activemq-nms-openwire
Updated Branches:
  refs/heads/1.2.x [created] 7599d7fdb
  refs/heads/1.3.x [created] 633899cef
  refs/heads/1.4.x [created] 408861aee
  refs/heads/1.5.x [created] 6c88b806b
  refs/heads/1.6.x [created] 94ad40978
  refs/heads/1.7.x [created] dab33daf2
  refs/heads/master [created] a3bfe8bed


https://issues.apache.org/jira/browse/AMQNET-489

Merge in fixes from AMQ-5146 to honor the redelivery policy on messages dispatched from the
Broker.


Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/commit/22fbad24
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/tree/22fbad24
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/diff/22fbad24

Branch: refs/heads/1.6.x
Commit: 22fbad2421cdf043f2c09936cd6ac49ae197630f
Parents: a0f256c
Author: Timothy A. Bish <tabish@apache.org>
Authored: Thu Aug 21 21:20:25 2014 +0000
Committer: Timothy A. Bish <tabish@apache.org>
Committed: Thu Aug 21 21:20:25 2014 +0000

----------------------------------------------------------------------
 src/main/csharp/MessageConsumer.cs             |  60 +++++++-
 src/main/csharp/Threads/CompositeTaskRunner.cs |   1 -
 src/test/csharp/AMQRedeliveryPolicyTest.cs     | 158 ++++++++++++++++++++
 3 files changed, 213 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/blob/22fbad24/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 3c02041..3d77291 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -39,6 +39,8 @@ namespace Apache.NMS.ActiveMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
+        private const int NO_MAXIMUM_REDELIVERIES = -1;
+
         private readonly MessageTransformation messageTransformation;
         private readonly MessageDispatchChannel unconsumedMessages;
         private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
@@ -789,7 +791,15 @@ namespace Apache.NMS.ActiveMQ
 						{
 							if(listener != null && this.unconsumedMessages.Running)
 							{
-								dispatchMessage = true;
+                                if (RedeliveryExceeded(dispatch)) 
+                                {
+                                    PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds
redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+                                    return;
+                                } 
+                                else
+                                {
+								    dispatchMessage = true;
+                                }
 							}
 							else
 							{
@@ -1014,6 +1024,11 @@ namespace Apache.NMS.ActiveMQ
 						}
 					}
 				}
+                else if (RedeliveryExceeded(dispatch))
+                {
+                    Tracer.DebugFormat("[{0}] received with excessive redelivered: {1}",
ConsumerId, dispatch);
+                    PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery
policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+                }
 				else
 				{
 					return dispatch;
@@ -1231,7 +1246,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 
 	        // evaluate both expired and normal msgs as otherwise consumer may get stalled
-			if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
+			if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
 			{
 				this.session.SendAck(pendingAck);
 				this.pendingAck = null;
@@ -1250,6 +1265,18 @@ namespace Apache.NMS.ActiveMQ
 	        this.session.Connection.SyncRequest(ack);
 	    }
 
+        private void PosionAck(MessageDispatch dispatch, string cause)
+        {
+            BrokerError poisonCause = new BrokerError();
+            poisonCause.ExceptionClass = "javax.jms.JMSException";
+            poisonCause.Message = cause;
+
+            MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
+            posionAck.FirstMessageId = dispatch.Message.MessageId;
+            posionAck.PoisonCause = poisonCause;
+            this.session.Connection.SyncRequest(posionAck);
+        }
+
 	    private void RegisterSync()
 		{
 			// Don't acknowledge now, but we may need to let the broker know the
@@ -1387,14 +1414,19 @@ namespace Apache.NMS.ActiveMQ
                                                this.info.ConsumerId, this.dispatchedMessages.Count,
this.redeliveryPolicy.MaximumRedeliveries);
                         }
 
+                        BrokerError poisonCause = new BrokerError();
+                        poisonCause.ExceptionClass = "javax.jms.JMSException";
+                        poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;
+
 						if (lastMd.RollbackCause != null)
 						{
 							BrokerError cause = new BrokerError();
-							cause.ExceptionClass = "javax.jms.JMSException";
-							cause.Message = lastMd.RollbackCause.Message;
-							ack.PoisonCause = cause;
+                            poisonCause.ExceptionClass = "javax.jms.JMSException";
+                            poisonCause.Message = lastMd.RollbackCause.Message;
+                            poisonCause.Cause = cause;
 						}
                     	ack.FirstMessageId = firstMsgId;
+                        ack.PoisonCause = poisonCause;
 
 						this.session.SendAck(ack);
 
@@ -1743,6 +1775,24 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
+        private bool RedeliveryExceeded(MessageDispatch dispatch) 
+        {
+            try 
+            {
+                ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage;
+
+                return session.IsTransacted && redeliveryPolicy != null &&
+                       redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES &&
+                       dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries
&&
+                       // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
+                       !amqMessage.Properties.Contains("redeliveryDelay");
+            }
+            catch (Exception ignored) 
+            {
+                return false;
+            }
+        }
+
 		#endregion
 
 		#region Nested ISyncronization Types

http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/blob/22fbad24/src/main/csharp/Threads/CompositeTaskRunner.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Threads/CompositeTaskRunner.cs b/src/main/csharp/Threads/CompositeTaskRunner.cs
index 96305c6..b19bdd4 100755
--- a/src/main/csharp/Threads/CompositeTaskRunner.cs
+++ b/src/main/csharp/Threads/CompositeTaskRunner.cs
@@ -17,7 +17,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Threading;
 
 namespace Apache.NMS.ActiveMQ.Threads

http://git-wip-us.apache.org/repos/asf/activemq-nms-openwire/blob/22fbad24/src/test/csharp/AMQRedeliveryPolicyTest.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/AMQRedeliveryPolicyTest.cs b/src/test/csharp/AMQRedeliveryPolicyTest.cs
index 2cab249..2ebdf0d 100644
--- a/src/test/csharp/AMQRedeliveryPolicyTest.cs
+++ b/src/test/csharp/AMQRedeliveryPolicyTest.cs
@@ -19,6 +19,7 @@ using System;
 using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
 using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test
@@ -27,6 +28,7 @@ namespace Apache.NMS.ActiveMQ.Test
     public class AMQRedeliveryPolicyTest : NMSTestSupport
     {
         private const string DESTINATION_NAME = "TEST.RedeliveryPolicyTestDest";
+        private const string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
 
         [Test]
         public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
@@ -443,5 +445,161 @@ namespace Apache.NMS.ActiveMQ.Test
                 session.Rollback();
             }
         }
+
+        [Test]
+        public void TestRepeatedRedeliveryReceiveNoCommit() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                connection.Start();
+
+                ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryReceiveNoCommit");
+                IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+                connection.DeleteDestination(destination);
+                connection.DeleteDestination(dlq);
+                IMessageProducer producer = dlqSession.CreateProducer(destination);
+                producer.Send(dlqSession.CreateTextMessage("1st"));
+                IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+                const int maxRedeliveries = 4;
+                for (int i = 0; i <= maxRedeliveries + 1; i++) 
+                {
+                    using(Connection loopConnection = (Connection) CreateConnection())
+                    {
+                        // Receive a message with the JMS API
+                        IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy;
+                        policy.InitialRedeliveryDelay = 0;
+                        policy.UseExponentialBackOff = false;
+                        policy.MaximumRedeliveries = maxRedeliveries;
+
+                        loopConnection.Start();
+                        ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional);
+                        IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                        ActiveMQTextMessage m = consumer.Receive(TimeSpan.FromMilliseconds(4000))
as ActiveMQTextMessage;
+                        if (m != null) 
+                        {
+                            Tracer.DebugFormat("Received Message: {0} delivery count = {1}",
m.Text, m.RedeliveryCounter);
+                        }
+
+                        if (i <= maxRedeliveries)
+                        {
+                            Assert.IsNotNull(m);
+                            Assert.AreEqual("1st", m.Text);
+                            Assert.AreEqual(i, m.RedeliveryCounter);
+                        } 
+                        else
+                        {
+                            Assert.IsNull(m, "null on exceeding redelivery count");
+                        }
+                    }
+                }
+
+                // We should be able to get the message off the DLQ now.
+                ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as
ITextMessage;
+                Assert.IsNotNull(msg, "Got message from DLQ");
+                Assert.AreEqual("1st", msg.Text);
+                String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause != null) 
+                {
+                    Tracer.DebugFormat("Rollback Cause = {0}", cause);
+                    Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has
no policy ref");
+                }
+                else
+                {
+                    Tracer.Debug("DLQ'd message has no cause tag.");
+                }
+            }
+        }
+
+        [Test]
+        public void TestRepeatedRedeliveryOnMessageNoCommit() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                connection.Start();
+                ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryOnMessageNoCommit");
+                IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+                connection.DeleteDestination(destination);
+                connection.DeleteDestination(dlq);
+                IMessageProducer producer = dlqSession.CreateProducer(destination);
+                IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+                producer.Send(dlqSession.CreateTextMessage("1st"));
+
+                const int maxRedeliveries = 4;
+                Atomic<int> receivedCount = new Atomic<int>(0);
+
+                for (int i = 0; i <= maxRedeliveries + 1; i++) 
+                {
+                    using(Connection loopConnection = (Connection) CreateConnection())
+                    {
+                        IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy;
+                        policy.InitialRedeliveryDelay = 0;
+                        policy.UseExponentialBackOff = false;
+                        policy.MaximumRedeliveries = maxRedeliveries;
+
+                        loopConnection.Start();
+                        ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional);
+                        IMessageConsumer consumer = session.CreateConsumer(destination);
+                        OnMessageNoCommitCallback callback = new OnMessageNoCommitCallback(receivedCount);
+                        consumer.Listener += new MessageListener(callback.consumer_Listener);
+
+                        if (i <= maxRedeliveries) 
+                        {
+                            Assert.IsTrue(callback.Await(), "listener should have dispatched
a message");
+                        } 
+                        else 
+                        {
+                            // final redlivery gets poisoned before dispatch
+                            Assert.IsFalse(callback.Await(), "listener should not have dispatched
after max redliveries");
+                        }
+                    }
+                }
+
+                // We should be able to get the message off the DLQ now.
+                ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as
ITextMessage;
+                Assert.IsNotNull(msg, "Got message from DLQ");
+                Assert.AreEqual("1st", msg.Text);
+                String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause != null) 
+                {
+                    Tracer.DebugFormat("Rollback Cause = {0}", cause);
+                    Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has
no policy ref");
+                }
+                else
+                {
+                    Tracer.Debug("DLQ'd message has no cause tag.");
+                }
+            }
+        }
+
+        class OnMessageNoCommitCallback
+        {
+            private Atomic<int> receivedCount;
+            private CountDownLatch done = new CountDownLatch(1);
+
+            public OnMessageNoCommitCallback(Atomic<int> receivedCount)
+            {
+                this.receivedCount = receivedCount;
+            }
+
+            public bool Await() 
+            {
+                return done.await(TimeSpan.FromMilliseconds(5000));
+            }
+
+            public void consumer_Listener(IMessage message)
+            {
+                ActiveMQTextMessage m = message as ActiveMQTextMessage;
+                Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text,
m.RedeliveryCounter);
+                Assert.AreEqual("1st", m.Text);
+                Assert.AreEqual(receivedCount.Value, m.RedeliveryCounter);
+                receivedCount.GetAndSet(receivedCount.Value + 1);
+                done.countDown();
+            }
+        }
     }
 }


Mime
View raw message