activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r380656 - in /incubator/activemq/trunk/openwire-dotnet: src/OpenWire.Client/ src/OpenWire.Client/Commands/ tests/OpenWire.Client/
Date Fri, 24 Feb 2006 12:47:17 GMT
Author: jstrachan
Date: Fri Feb 24 04:47:14 2006
New Revision: 380656

URL: http://svn.apache.org/viewcvs?rev=380656&view=rev
Log:
added support for explicit client acknowledgement of messages or for auto-acknowledge

Modified:
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessage.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ISession.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs
(original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs
Fri Feb 24 04:47:14 2006
@@ -22,6 +22,8 @@
 
 namespace OpenWire.Client.Commands
 {
+    public delegate void AcknowledgeHandler(ActiveMQMessage message);
+    
     public class ActiveMQMessage : Message, IMessage, MarshallAware
     {
         public const byte ID_ActiveMQMessage = 23;
@@ -30,7 +32,7 @@
         
         private PrimitiveMap properties;
         
-        
+        public event AcknowledgeHandler Acknowledger;
         
         public static ActiveMQMessage Transform(IMessage message)
         {
@@ -44,6 +46,15 @@
         public override byte GetDataStructureType()
         {
             return ID_ActiveMQMessage;
+        }
+        
+        public void Acknowledge()
+        {
+            if (Acknowledger == null){
+                throw new OpenWireException("No Acknowledger has been associated with this
message: " + this);}
+            else {
+                Acknowledger(this);
+            }
         }
         
         

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs Fri Feb 24
04:47:14 2006
@@ -21,7 +21,7 @@
         private bool transacted;
         private bool connected;
         private bool closed;
-        private AcknowledgementMode acknowledgementMode;
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
         private long sessionCounter;
         private long temporaryDestinationCounter;
         private IDictionary consumers = new Hashtable(); // TODO threadsafe
@@ -35,20 +35,20 @@
             this.transport.Start();
         }
  
-		/// <summary>
-		/// Starts message delivery for this connection.
-		/// </summary>
-		public void Start() 
-		{
-		}
+        /// <summary>
+        /// Starts message delivery for this connection.
+        /// </summary>
+        public void Start()
+        {
+        }
         
         
         /// <summary>
-		/// Stop message delivery for this connection.
-		/// </summary>
-		public void Stop() 
-		{
-		}
+        /// Stop message delivery for this connection.
+        /// </summary>
+        public void Stop()
+        {
+        }
         
         /// <summary>
         /// Creates a new session to work on this connection
@@ -66,7 +66,7 @@
             CheckConnected();
             SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
             SyncRequest(info);
-            Session session = new Session(this, info);
+            Session session = new Session(this, info, acknowledgementMode);
             sessions.Add(session);
             return session;
         }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessage.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessage.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessage.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessage.cs Fri Feb 24 04:47:14
2006
@@ -26,6 +26,12 @@
     {
         
         /// <summary>
+        /// If using client acknowledgement mode on the session then this method will acknowledge
that the
+        /// message has been processed correctly.
+        /// </summary>
+        void Acknowledge();
+        
+        /// <summary>
         /// Provides access to the message properties (headers)
         /// </summary>
         IPrimitiveMap Properties {

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs Fri Feb
24 04:47:14 2006
@@ -27,12 +27,12 @@
     public interface IMessageConsumer : IDisposable
     {
         
-		/// <summary>
-		/// Waits until a message is available and returns it
-		/// </summary>
-		IMessage Receive();
-		
-		/// <summary>
+        /// <summary>
+        /// Waits until a message is available and returns it
+        /// </summary>
+        IMessage Receive();
+        
+        /// <summary>
         /// If a message is available within the timeout duration it is returned otherwise
this method returns null
         /// </summary>
         IMessage Receive(long timeout);
@@ -46,6 +46,5 @@
         /// An asynchronous listener which can be used to consume messages asynchronously
         /// </summary>
         event MessageHandler Listener;
-        
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ISession.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ISession.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ISession.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/ISession.cs Fri Feb 24 04:47:14
2006
@@ -26,9 +26,6 @@
     public interface ISession : IDisposable
     {
         
-        
-        
-        
         /// <summary>
         /// Creates a producer of messages
         /// </summary>
@@ -50,9 +47,9 @@
         IMessageConsumer CreateConsumer(IDestination destination, string selector);
         
         /// <summary>
-		/// Creates a named durable consumer of messages on a given destination with a selector
-		/// </summary>
-		IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector,
bool noLocal);
+        /// Creates a named durable consumer of messages on a given destination with a selector
+        /// </summary>
+        IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector,
bool noLocal);
             
         /// <summary>
         /// Returns the queue for the given name

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs Fri Feb
24 04:47:14 2006
@@ -22,6 +22,13 @@
 
 namespace OpenWire.Client
 {
+    public enum AckType {
+        DeliveredAck = 0, // Message delivered but not consumed
+        ConsumedAck = 1, // Message consumed, discard
+        PoisonAck = 2 // Message could not be processed due to poison pill but discard anyway
+    }
+    
+    
     /// <summary>
     /// An object capable of receiving messages from some destination
     /// </summary>
@@ -30,15 +37,17 @@
         
         private Session session;
         private ConsumerInfo info;
+        private AcknowledgementMode acknowledgementMode;
         private bool closed;
         private Dispatcher dispatcher = new Dispatcher();
         
         public event MessageHandler Listener;
         
-        public MessageConsumer(Session session, ConsumerInfo info)
+        public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.session = session;
             this.info = info;
+            this.acknowledgementMode = acknowledgementMode;
         }
         
         /// <summary>
@@ -53,21 +62,23 @@
         public IMessage Receive()
         {
             CheckClosed();
-            return dispatcher.Dequeue();
+            return AutoAcknowledge(dispatcher.Dequeue());
         }
         
         public IMessage Receive(long timeout)
         {
             CheckClosed();
-            return dispatcher.Dequeue(timeout);
+            return AutoAcknowledge(dispatcher.Dequeue(timeout));
         }
         
         public IMessage ReceiveNoWait()
         {
             CheckClosed();
-            return dispatcher.DequeueNoWait();
+            return AutoAcknowledge(dispatcher.DequeueNoWait());
         }
         
+        
+        
         public void Dispose()
         {
             session.DisposeOf(info.ConsumerId);
@@ -81,5 +92,53 @@
                 throw new ConnectionClosedException();
             }
         }
+        
+        protected IMessage AutoAcknowledge(IMessage message)
+        {
+            if (message is ActiveMQMessage)
+            {
+                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+                
+                // lets register the handler for client acknowledgment
+                activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
+                
+                if (acknowledgementMode != AcknowledgementMode.ClientAcknowledge)
+                {
+                    DoAcknowledge(activeMessage);
+                }
+            }
+            return message;
+        }
+        
+        protected void DoClientAcknowledge(Message message)
+        {
+            if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
+            {
+                DoAcknowledge(message);
+            }
+        }
+
+        protected void DoAcknowledge(Message message)
+        {
+            MessageAck ack = CreateMessageAck(message);
+            //Console.WriteLine("Sending Ack: " + ack);
+            session.Connection.SyncRequest(ack);
+        }
+        
+        
+        protected virtual MessageAck CreateMessageAck(Message message)
+        {
+            MessageAck ack = new MessageAck();
+            ack.AckType = (int) AckType.ConsumedAck;
+            ack.ConsumerId = info.ConsumerId;
+            ack.Destination = message.Destination;
+            ack.FirstMessageId = message.MessageId;
+            ack.LastMessageId = message.MessageId;
+            ack.MessageCount = 1;
+            ack.TransactionId = message.TransactionId;
+            return ack;
+        }
+        
+        
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs Fri Feb 24 04:47:14
2006
@@ -26,16 +26,17 @@
     public class Session : ISession
     {
         private Connection connection;
-        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
         private SessionInfo info;
+        private AcknowledgementMode acknowledgementMode;
         private long consumerCounter;
         private long producerCounter;
         private int prefetchSize = 1000;
         
-        public Session(Connection connection, SessionInfo info)
+        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.connection = connection;
             this.info = info;
+            this.acknowledgementMode = acknowledgementMode;
         }
         
         public void Dispose()
@@ -55,14 +56,6 @@
             return new MessageProducer(this, command);
         }
         
-        public void Acknowledge(Message message)
-        {
-            if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
-            {
-                MessageAck ack = CreateMessageAck(message);
-                connection.SyncRequest(ack);
-            }
-        }
         
         
         public IMessageConsumer CreateConsumer(IDestination destination)
@@ -77,7 +70,7 @@
             
             try
             {
-                MessageConsumer consumer = new MessageConsumer(this, command);
+                MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
                 // lets register the consumer first in case we start dispatching messages
immediately
                 connection.AddConsumer(consumerId, consumer);
                 
@@ -91,28 +84,28 @@
             }
         }
         
-		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector,
bool noLocal)
-		{
-			ConsumerInfo command = CreateConsumerInfo(destination, selector);
-			ConsumerId consumerId = command.ConsumerId;
-			command.SubcriptionName = name;
-			command.NoLocal = noLocal;
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string
selector, bool noLocal)
+        {
+            ConsumerInfo command = CreateConsumerInfo(destination, selector);
+            ConsumerId consumerId = command.ConsumerId;
+            command.SubcriptionName = name;
+            command.NoLocal = noLocal;
             
-			try
-			{
-				MessageConsumer consumer = new MessageConsumer(this, command);
-				// lets register the consumer first in case we start dispatching messages immediately
-				connection.AddConsumer(consumerId, consumer);
+            try
+            {
+                MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+                // lets register the consumer first in case we start dispatching messages
immediately
+                connection.AddConsumer(consumerId, consumer);
                 
-				connection.SyncRequest(command);
-				return consumer;
-			}
-			catch (Exception e)
-			{
-				connection.RemoveConsumer(consumerId);
-				throw e;
-			}
-		}
+                connection.SyncRequest(command);
+                return consumer;
+            }
+            catch (Exception e)
+            {
+                connection.RemoveConsumer(consumerId);
+                throw e;
+            }
+        }
 
         public IQueue GetQueue(string name)
         {
@@ -176,7 +169,12 @@
         }
         
         
-        
+        // Properties
+        public Connection Connection {
+            get {
+                return connection;
+            }
+        }
         
         // Implementation methods
         public void DoSend(IDestination destination, IMessage message)
@@ -234,13 +232,6 @@
             answer.ProducerId = id;
             answer.Destination = ActiveMQDestination.Transform(destination);
             return answer;
-        }
-        
-        protected virtual MessageAck CreateMessageAck(Message message)
-        {
-            MessageAck ack = new MessageAck();
-            // TODO complete packet
-            return ack;
         }
         
         /// <summary>

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs?rev=380656&r1=380655&r2=380656&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs Fri
Feb 24 04:47:14 2006
@@ -105,14 +105,14 @@
             Assert.AreEqual(custom4, message.Properties["custom4"], "custom4");
             // TODO
             //Assert.AreEqual(custom5, message.Properties["custom5"], "custom5");
-            Assert.AreEqual(custom4, message.Properties["custom6"], "custom6");
+            Assert.AreEqual(custom6, message.Properties["custom6"], "custom6");
             
             Assert.AreEqual(custom1, message.Properties.GetBool("custom1"), "custom1");
             Assert.AreEqual(custom2, message.Properties.GetByte("custom2"), "custom2");
             Assert.AreEqual(custom3, message.Properties.GetShort("custom3"), "custom3");
             Assert.AreEqual(custom4, message.Properties.GetInt("custom4"), "custom4");
             //Assert.AreEqual(custom5, message.Properties.GetLong("custom5"), "custom5");
-            Assert.AreEqual(custom4, message.Properties.GetChar("custom6"), "custom6");
+            Assert.AreEqual(custom6, message.Properties.GetChar("custom6"), "custom6");
             
             // lets now look at some standard JMS headers
             Console.WriteLine("JMSExpiration: " + message.JMSExpiration);



Mime
View raw message