activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r381319 - in /incubator/activemq/trunk/openwire-dotnet: src/OpenWire.Client/ src/OpenWire.Client/Core/ tests/OpenWire.Client/
Date Mon, 27 Feb 2006 11:28:48 GMT
Author: jstrachan
Date: Mon Feb 27 03:28:35 2006
New Revision: 381319

URL: http://svn.apache.org/viewcvs?rev=381319&view=rev
Log:
added support for asynchronous consumption to .Net using a MessageListener along with fixing
up some nant build errors

Added:
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs
Modified:
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/BrokerException.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs
    incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs
    incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/BrokerException.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/BrokerException.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/BrokerException.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/BrokerException.cs Mon Feb
27 03:28:35 2006
@@ -41,7 +41,7 @@
             }
         }
         
-        public virtual string StackTrace
+        public virtual string JavaStackTrace
         {
             get {
                 return brokerError.StackTrace;

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Connection.cs Mon Feb 27
03:28:35 2006
@@ -2,6 +2,7 @@
 using System.Collections;
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
+using System.Threading;
 
 namespace OpenWire.Client
 {
@@ -108,6 +109,18 @@
                     throw new OpenWireException("You cannot change the ClientId once the
Connection is connected");
                 }
                 info.ClientId = value;
+            }
+        }
+        
+        public BrokerInfo BrokerInfo {
+            get {
+                return brokerInfo;
+            }
+        }
+        
+        public WireFormatInfo BrokerWireFormat {
+            get {
+                return brokerWireFormatInfo;
             }
         }
         

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs
(original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs
Mon Feb 27 03:28:35 2006
@@ -340,8 +340,6 @@
         /// <summary>
         /// Switches from one endian to the other
         /// </summary>
-        /// <param name="value">An int</param>
-        /// <returns>An int</retutns>
         public static int SwitchEndian(int x)
         {
             return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000)
>> 8) | (x >> 24));
@@ -799,8 +797,6 @@
         /// <summary>
         /// Method ReadFloat
         /// </summary>
-        /// <param name="dataIn">A  BinaryReader</param>
-        /// <returns>An Object</retutns>
         private static Object ReadFloat(BinaryReader dataIn)
         {
             // TODO: Implement this method

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs Mon Feb
27 03:28:35 2006
@@ -17,6 +17,7 @@
 using System.Collections;
 using OpenWire.Client.Commands;
 using System;
+using OpenWire.Client;
 
 namespace OpenWire.Client.Core
 {
@@ -26,11 +27,10 @@
     public class Dispatcher
     {
         Queue queue = Queue.Synchronized( new Queue() );
-        
+
         /// <summary>
         /// Method Enqueue
         /// </summary>
-        /// <param name="message">An ActiveMQMessage</param>
         public void Enqueue(ActiveMQMessage message)
         {
             queue.Enqueue(message);
@@ -39,7 +39,6 @@
         /// <summary>
         /// Method DequeueNoWait
         /// </summary>
-        /// <returns>An IMessage</retutns>
         public IMessage DequeueNoWait()
         {
             lock (queue)
@@ -55,8 +54,6 @@
         /// <summary>
         /// Method Dequeue
         /// </summary>
-        /// <param name="timeout">A  long</param>
-        /// <returns>An IMessage</retutns>
         public IMessage Dequeue(long timeout)
         {
             // TODO
@@ -66,12 +63,10 @@
         /// <summary>
         /// Method Dequeue
         /// </summary>
-        /// <returns>An IMessage</retutns>
         public IMessage Dequeue()
         {
             return (IMessage) queue.Dequeue();
         }
-        
         
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs Mon
Feb 27 03:28:35 2006
@@ -94,7 +94,10 @@
         
         public Object Unmarshal(BinaryReader dis)
         {
-            int size = DataStreamMarshaller.ReadInt(dis);
+            // lets ignore the size of the packet
+            DataStreamMarshaller.ReadInt(dis);
+            
+            // first byte is the type of the packet
             byte dataType = DataStreamMarshaller.ReadByte(dis);
             if (dataType != NULL_TYPE)
             {
@@ -206,7 +209,6 @@
         /// <summary>
         /// Method CreateMagicBytes
         /// </summary>
-        /// <returns>A  byte[]</retutns>
         private byte[] CreateMagicBytes()
         {
             byte[] answer = new byte[MAGIC.Length];

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs Mon Feb
27 03:28:35 2006
@@ -19,7 +19,7 @@
 
 namespace OpenWire.Client
 {
-    public delegate void MessageHandler(IMessage message);
+    public delegate void MessageListener(IMessage message);
     
     /// <summary>
     /// A consumer of messages
@@ -45,6 +45,6 @@
         /// <summary>
         /// An asynchronous listener which can be used to consume messages asynchronously
         /// </summary>
-        event MessageHandler Listener;
+        event MessageListener Listener;
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs Mon Feb
27 03:28:35 2006
@@ -41,7 +41,7 @@
         private bool closed;
         private Dispatcher dispatcher = new Dispatcher();
         
-        public event MessageHandler Listener;
+        public event MessageListener Listener;
         
         public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
         {
@@ -50,6 +50,13 @@
             this.acknowledgementMode = acknowledgementMode;
         }
         
+        public ConsumerId ConsumerId {
+            get {
+                return info.ConsumerId;
+            }
+        }
+
+        
         /// <summary>
         /// Method Dispatch
         /// </summary>
@@ -57,6 +64,11 @@
         public void Dispatch(ActiveMQMessage message)
         {
             dispatcher.Enqueue(message);
+            
+            if (Listener != null) {
+                // lets dispatch to the thread pool for this connection for messages to be
processed
+                ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
+            }
         }
         
         public IMessage Receive()
@@ -83,6 +95,22 @@
         {
             session.DisposeOf(info.ConsumerId);
             closed = true;
+        }
+        
+        /// <summary>
+        /// Dispatch any pending messages to the asynchronous listener
+        /// </summary>
+        public void DispatchAsyncMessages()
+        {
+            while (Listener != null) {
+                IMessage message = dispatcher.DequeueNoWait();
+                if (message != null) {
+                    Listener(message);
+                }
+                else {
+                    break;
+                }
+            }
         }
         
         protected void CheckClosed()

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs Mon Feb 27 03:28:35
2006
@@ -17,6 +17,7 @@
 using System;
 using OpenWire.Client.Commands;
 using OpenWire.Client.Core;
+using System.Collections;
 
 namespace OpenWire.Client
 {
@@ -31,6 +32,7 @@
         private long consumerCounter;
         private long producerCounter;
         private int prefetchSize = 1000;
+        private IDictionary consumers = new Hashtable();
         
         public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
         {
@@ -75,6 +77,8 @@
                 connection.AddConsumer(consumerId, consumer);
                 
                 connection.SyncRequest(command);
+                
+                consumers[consumerId] = consumer;
                 return consumer;
             }
             catch (Exception e)
@@ -98,6 +102,8 @@
                 connection.AddConsumer(consumerId, consumer);
                 
                 connection.SyncRequest(command);
+                
+                consumers[consumerId] = consumer;
                 return consumer;
             }
             catch (Exception e)
@@ -194,12 +200,22 @@
         
         public void DisposeOf(ConsumerId objectId)
         {
+            consumers.Remove(objectId);
             connection.RemoveConsumer(objectId);
             RemoveInfo command = new RemoveInfo();
             command.ObjectId = objectId;
             connection.SyncRequest(command);
         }
         
+        public void DispatchAsyncMessages(object state) {
+            // lets iterate through each consumer created by this session
+            // ensuring that they have all pending messages dispatched
+            foreach (MessageConsumer consumer in consumers.Values) {
+                consumer.DispatchAsyncMessages();
+            }
+        }
+        
+        
         protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string
selector)
         {
             ConsumerInfo answer = new ConsumerInfo();
@@ -237,11 +253,8 @@
         /// <summary>
         /// Configures the message command
         /// </summary>
-        /// <param name="activeMQMessage">An ActiveMQMessage</param>
-        /// <returns>An IMessage</retutns>
         protected void Configure(ActiveMQMessage message)
         {
         }
-        
     }
 }

Added: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs?rev=381319&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs (added)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs Mon
Feb 27 03:28:35 2006
@@ -0,0 +1,77 @@
+using System;
+using System.IO;
+using System.Threading;
+
+using NUnit.Framework;
+
+using OpenWire.Client;
+using OpenWire.Client.Core;
+namespace OpenWire.Client
+{
+    [TestFixture]
+    public class AsyncConsumeTest : TestSupport
+    {
+        protected Object semaphore = new Object();
+        protected bool received;
+        
+        [Test]
+        public void TestAsynchronousConsume()
+        {
+            IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
+            Assert.IsTrue(factory != null, "no factory created");
+            
+            using (IConnection connection = factory.CreateConnection())
+            {
+                Assert.IsTrue(connection != null, "no connection created");
+                Console.WriteLine("Connected to ActiveMQ!");
+                
+                ISession session = connection.CreateSession();
+                IDestination destination = CreateDestination(session);
+                Assert.IsTrue(destination != null, "No queue available!");
+                
+                // lets create an async consumer
+                // START SNIPPET: demo
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                consumer.Listener += new MessageListener(OnMessage);
+                // END SNIPPET: demo
+                
+                
+                // now lets send a message
+                session = connection.CreateSession();
+                IMessageProducer producer = session.CreateProducer(destination);
+                IMessage request = CreateMessage(session);
+                request.JMSCorrelationID = "abc";
+                request.JMSType = "Test";
+                producer.Send(request);
+                
+                
+                WaitForMessageToArrive();
+            }
+            
+        }
+        
+        protected void OnMessage(IMessage message)
+        {
+            Console.WriteLine("Received message: " + message);
+            lock (semaphore)
+            {
+                received = true;
+                Monitor.PulseAll(semaphore);
+            }
+            
+        }
+        
+        protected void WaitForMessageToArrive()
+        {
+            lock (semaphore)
+            {
+                if (!received)
+                {
+                    Monitor.Wait(semaphore, 10000);
+                }
+            }
+            Assert.AreEqual(true, received, "Should have received a message by now!");
+        }
+        
+    }
+}

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs Mon Feb
27 03:28:35 2006
@@ -17,13 +17,15 @@
                 try
                 {
                     IMessageConsumer consumer = session.CreateConsumer(null);
+                    Console.WriteLine("Created consumer: " + consumer);
+                    
                     Assert.Fail("Should  have thrown an exception!");
                 }
                 catch (BrokerException e)
                 {
                     Console.WriteLine("Caught expected exception: " + e);
                     Console.WriteLine("Stack: " + e.StackTrace);
-                    Console.WriteLine("BrokerStrack: " + e.BrokerError.StackTrace);
+                    Console.WriteLine("Java Stack: " + e.JavaStackTrace);
                 }
             }
         }

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs?rev=381319&r1=381318&r2=381319&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs Mon Feb
27 03:28:35 2006
@@ -70,7 +70,7 @@
                 }
             }
         }
-
+        
         protected virtual IDestination CreateDestination(ISession session)
         {
             string name = "Test.DotNet." + GetType().Name;
@@ -80,11 +80,13 @@
             return destination;
         }
         
-        protected virtual IMessage CreateMessage(ISession session) {
+        protected virtual IMessage CreateMessage(ISession session)
+        {
             return session.CreateMessage();
         }
         
-        protected virtual  void AssertValidMessage(IMessage message) {
+        protected virtual  void AssertValidMessage(IMessage message)
+        {
             Assert.IsNotNull(message, "Null Message!");
         }
     }



Mime
View raw message