activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r830998 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Connection.cs main/csharp/MessageConsumer.cs main/csharp/Session.cs test/csharp/IndividualAckTest.cs
Date Thu, 29 Oct 2009 15:22:50 GMT
Author: tabish
Date: Thu Oct 29 15:22:48 2009
New Revision: 830998

URL: http://svn.apache.org/viewvc?rev=830998&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-199

Added the support for Individual Ack of messages and a unit test case for it.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=830998&r1=830997&r2=830998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Oct
29 15:22:48 2009
@@ -314,7 +314,7 @@
             return session;
         }
 
-        public void RemoveSession(Session session)
+        internal void RemoveSession(Session session)
         {
             if(!this.closing)
             {
@@ -322,22 +322,22 @@
             }
         }
 
-        public void addDispatcher( ConsumerId id, IDispatcher dispatcher )
+        internal void addDispatcher( ConsumerId id, IDispatcher dispatcher )
         {
             this.dispatchers.Add( id, dispatcher );
         }
 
-        public void removeDispatcher( ConsumerId id )
+        internal void removeDispatcher( ConsumerId id )
         {
             this.dispatchers.Remove( id );
         }
         
-        public void addProducer( ProducerId id, MessageProducer producer )
+        internal void addProducer( ProducerId id, MessageProducer producer )
         {
             this.producers.Add( id, producer );
         }
 
-        public void removeProducer( ProducerId id )
+        internal void removeProducer( ProducerId id )
         {
             this.producers.Remove( id );
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=830998&r1=830997&r2=830998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Thu Oct 29 15:22:48 2009
@@ -312,9 +312,34 @@
 
         protected void DoIndividualAcknowledge(ActiveMQMessage message)
         {
-            // TODO
-        }
+            MessageDispatch dispatch = null;
+            
+            lock(this.dispatchedMessages)
+            {
+                foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
+                {
+                    if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+                    {
+                        dispatch = originalDispatch;
+                        this.dispatchedMessages.Remove(originalDispatch);
+                        break;
+                    }
+
+                    return;
+                }
+            }
+
+            MessageAck ack = new MessageAck();
+
+            ack.AckType = (byte)AckType.IndividualAck;
+            ack.ConsumerId = this.info.ConsumerId;
+            ack.Destination = dispatch.Destination;
+            ack.LastMessageId = dispatch.Message.MessageId;
+            ack.MessageCount = 1;            
 
+            this.session.Connection.Oneway(ack);            
+        }
+                
 		protected void DoNothingAcknowledge(ActiveMQMessage message)
 		{
 		}
@@ -728,7 +753,7 @@
             }
         }
 
-        private void Acknowledge()
+        internal void Acknowledge()
         {
             lock(this.dispatchedMessages)
             {
@@ -760,23 +785,6 @@
             }            
         }        
 
-        private void Acknowledge(MessageDispatch dispatch)
-        {
-            MessageAck ack = new MessageAck();
-
-            ack.AckType = (byte)AckType.IndividualAck;
-            ack.ConsumerId = this.info.ConsumerId;
-            ack.Destination = dispatch.Destination;
-            ack.LastMessageId = dispatch.Message.MessageId;
-            ack.MessageCount = 1;            
-
-            this.session.Connection.Oneway(ack);
-            lock(this.dispatchedMessages)
-            {
-                this.dispatchedMessages.Remove(dispatch);
-            }
-        }
-        
         private void Commit()
         {
             lock(this.dispatchedMessages)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=830998&r1=830997&r2=830998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Oct
29 15:22:48 2009
@@ -189,7 +189,7 @@
 
         public bool IsIndividualAcknowledge
         {
-            get { return false; }
+            get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge;
}
         }
 
         public bool IsTransacted
@@ -756,7 +756,7 @@
             }
         }
 
-        public void Redispatch(MessageDispatchChannel channel)
+        internal void Redispatch(MessageDispatchChannel channel)
         {
             MessageDispatch[] messages = channel.RemoveAll();
             System.Array.Reverse(messages);
@@ -775,7 +775,7 @@
             }
         }
 
-        public void ClearMessagesInProgress() 
+        internal void ClearMessagesInProgress() 
         {        
             if( this.executor != null ) {
                 this.executor.ClearMessagesInProgress();
@@ -789,15 +789,15 @@
                 }
             }
         }
-        
-        public void DeliverAcks() 
-        {        
+
+        internal void Acknowledge()
+        {
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers)
                 {
-                    consumer.DeliverAcks();
-                }
+                    consumer.Acknowledge();
+                }                
             }
         }
         

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs?rev=830998&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
Thu Oct 29 15:22:48 2009
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    [TestFixture]    
+    public class IndividualAckTest : NMSTestSupport
+    {
+        private IConnection connection;
+        
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+    
+            connection = CreateConnection();
+            connection.Start();
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            connection.Close();
+            base.TearDown();
+        }
+
+        [Test]
+        public void TestAckedMessageAreConsumed() 
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+            ITemporaryQueue queue = session.CreateTemporaryQueue();
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.Send(session.CreateTextMessage("Hello"));
+    
+            // Consume the message...
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            msg.Acknowledge();
+    
+            // Reset the session.
+            session.Close();
+            session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+    
+            // Attempt to Consume the message...
+            consumer = session.CreateConsumer(queue);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNull(msg);
+    
+            session.Close();
+        }
+
+        [Test]
+        public void TestLastMessageAcked() 
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+            ITemporaryQueue queue = session.CreateTemporaryQueue();
+            IMessageProducer producer = session.CreateProducer(queue);
+            ITextMessage msg1 = session.CreateTextMessage("msg1");
+            ITextMessage msg2 = session.CreateTextMessage("msg2");
+            ITextMessage msg3 = session.CreateTextMessage("msg3");
+            producer.Send(msg1);
+            producer.Send(msg2);
+            producer.Send(msg3);
+    
+            // Consume the message...
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);        
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            msg.Acknowledge();
+    
+            // Reset the session.
+            session.Close();
+            session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+    
+            // Attempt to Consume the message...
+            consumer = session.CreateConsumer(queue);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            Assert.AreEqual(msg1,msg);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            Assert.AreEqual(msg2,msg);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNull(msg);
+            session.Close();
+        }
+
+        [Test]
+        public void TestUnAckedMessageAreNotConsumedOnSessionClose() 
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+            ITemporaryQueue queue = session.CreateTemporaryQueue();
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.Send(session.CreateTextMessage("Hello"));
+    
+            // Consume the message...
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);        
+            // Don't ack the message.
+            
+            // Reset the session.  This should cause the unacknowledged message to be re-delivered.
+            session.Close();
+            session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+                    
+            // Attempt to Consume the message...
+            consumer = session.CreateConsumer(queue);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+            Assert.IsNotNull(msg);        
+            msg.Acknowledge();
+            
+            session.Close();
+        }
+        
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message