activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r512740 - in /activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ: Connection.cs ConnectionFactory.cs MessageProducer.cs Session.cs
Date Wed, 28 Feb 2007 12:55:45 GMT
Author: jstrachan
Date: Wed Feb 28 04:55:44 2007
New Revision: 512740

URL: http://svn.apache.org/viewvc?view=rev&rev=512740
Log:
added a fix for AMQNET-41 to allow async sending

Modified:
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Wed Feb 28 04:55:44
2007
@@ -34,6 +34,7 @@
         private WireFormatInfo brokerWireFormatInfo; // from broker
         private IList sessions = new ArrayList();
         private IDictionary consumers = new Hashtable(); // TODO threadsafe
+        private bool asyncSend;
         private bool connected;
         private bool closed;
         private long sessionCounter;
@@ -59,14 +60,25 @@
 			get { return started.Value; }
 		}
 
+		
 		/// <summary>
-		/// Starts asynchronous message delivery of incoming messages for this connection. 
+		/// This property indicates whether or not async send is enabled.
+		/// </summary>
+		public bool AsyncSend
+		{
+			get { return asyncSend; }
+			set { asyncSend = value; }
+		}
+		
+		
+		/// <summary>
+		/// Starts asynchronous message delivery of incoming messages for this connection.
 		/// Synchronous delivery is unaffected.
 		/// </summary>
 		public void Start()
 		{
 			CheckConnected();
-			if (started.CompareAndSet(false, true)) 
+			if (started.CompareAndSet(false, true))
 			{
 				foreach(Session session in sessions)
 				{
@@ -82,7 +94,7 @@
 		public void Stop()
 		{
 			CheckConnected();
-			if (started.CompareAndSet(true, false)) 
+			if (started.CompareAndSet(true, false))
 			{
 				foreach(Session session in sessions)
 				{

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/ConnectionFactory.cs Wed Feb 28
04:55:44 2007
@@ -28,6 +28,9 @@
     /// </summary>
     public class ConnectionFactory : IConnectionFactory
     {
+		public const string DEFAULT_BROKER_URL = "tcp://localhost:61616";
+		public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
+		
         private Uri brokerUri;
         private string userName;
         private string password;
@@ -35,8 +38,11 @@
         
 		public static string GetDefaultBrokerUrl()
 		{
-			// TODO look in system properties / environment variables
-			return "tcp://localhost:61616";
+			string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+			if (answer == null) {
+				answer = DEFAULT_BROKER_URL;
+			}
+			return answer;
 		}
 		
         public ConnectionFactory()

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageProducer.cs Wed Feb 28
04:55:44 2007
@@ -31,6 +31,7 @@
         
         private bool persistent = NMSConstants.defaultPersistence;
         private TimeSpan timeToLive;
+		private bool specifiedTimeToLive;
         private byte priority = NMSConstants.defaultPriority;
         private bool disableMessageID = false;
         private bool disableMessageTimestamp = false;
@@ -48,7 +49,7 @@
         
         public void Send(IDestination destination, IMessage message)
         {
-			Send(destination, message, Persistent, Priority, TimeToLive);
+			Send(destination, message, Persistent, Priority, TimeToLive, specifiedTimeToLive);
 		}
 		
         public void Send(IMessage message, bool persistent, byte priority, TimeSpan timeToLive)
@@ -58,6 +59,11 @@
 		
         public void Send(IDestination destination, IMessage message, bool persistent, byte
priority, TimeSpan timeToLive)
         {
+			Send(destination, message, persistent, priority, timeToLive, true);
+		}
+		
+        public void Send(IDestination destination, IMessage message, bool persistent, byte
priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+        {
 			ActiveMQMessage activeMessage = (ActiveMQMessage)message;
 
 			if (!disableMessageID)
@@ -71,11 +77,17 @@
 				activeMessage.MessageId = id;
 			}
 
-			if (!disableMessageTimestamp)
+			if (!disableMessageTimestamp && specifiedTimeToLive)
 			{
+				Console.WriteLine(">>> sending message with Timestamp: " + activeMessage.Timestamp
+ " and timeToLive:  " + timeToLive);
 				activeMessage.Timestamp = ActiveMQ.Util.DateUtils.ToJavaTime(DateTime.UtcNow);
 			}
-
+			
+			if (specifiedTimeToLive)
+			{
+				activeMessage.Expiration = ActiveMQ.Util.DateUtils.ToJavaTime(timeToLive);
+			}
+				
             activeMessage.ProducerId = info.ProducerId;
             activeMessage.Destination = ActiveMQDestination.Transform(destination);
             
@@ -84,8 +96,12 @@
                 session.DoStartTransaction();
                 activeMessage.TransactionId = session.TransactionContext.TransactionId;
             }
-            
-            session.DoSend(destination, message, persistent, priority, timeToLive);
+			
+			activeMessage.Persistent = persistent;
+			activeMessage.Priority = priority;
+			activeMessage.Destination = ActiveMQDestination.Transform(destination);
+		    
+            session.DoSend(activeMessage);
         }
         
         public void Dispose()

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?view=diff&rev=512740&r1=512739&r2=512740
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Wed Feb 28 04:55:44
2007
@@ -36,6 +36,7 @@
                 private bool dispatchAsync;
                 private bool exclusive;
                 private bool retroactive;
+				private bool asyncSend;
                 private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
                 private TransactionContext transactionContext;
                 private DispatchingThread dispatchingThread;
@@ -45,6 +46,7 @@
                         this.connection = connection;
                         this.info = info;
                         this.acknowledgementMode = acknowledgementMode;
+						this.asyncSend = connection.AsyncSend;
                         transactionContext = new TransactionContext(this);
                         dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
                         dispatchingThread.ExceptionListener += new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
@@ -61,8 +63,8 @@
                 /// until acknowledgements are received.
                 /// </summary>
                 public int PrefetchSize {
-                        get { return prefetchSize; } 
-                        set { this.prefetchSize = value; } 
+                        get { return prefetchSize; }
+                        set { this.prefetchSize = value; }
                 }
 
                 /// <summary>
@@ -72,16 +74,16 @@
                 /// Must be > 0 to enable this feature
                 /// </summary>
                 public int MaximumPendingMessageLimit {
-                        get { return maximumPendingMessageLimit; } 
-                        set { this.maximumPendingMessageLimit = value; } 
+                        get { return maximumPendingMessageLimit; }
+                        set { this.maximumPendingMessageLimit = value; }
                 }
 
                 /// <summary>
                 /// Enables or disables whether asynchronous dispatch should be used by the
broker
                 /// </summary>
                 public bool DispatchAsync {
-                        get { return dispatchAsync; } 
-                        set { this.dispatchAsync = value; } 
+                        get { return dispatchAsync; }
+                        set { this.dispatchAsync = value; }
                 }
 
                 /// <summary>
@@ -89,26 +91,35 @@
                 /// only one instance of a consumer is allowed to process messages on a queue
to preserve order
                 /// </summary>
                 public bool Exclusive {
-                        get { return exclusive; } 
-                        set { this.exclusive = value; } 
+                        get { return exclusive; }
+                        set { this.exclusive = value; }
                 }
 
                 /// <summary>
                 /// Enables or disables retroactive mode for consumers; i.e. do they go back
in time or not?
                 /// </summary>
                 public bool Retroactive {
-                        get { return retroactive; } 
-                        set { this.retroactive = value; } 
+                        get { return retroactive; }
+                        set { this.retroactive = value; }
                 }
 
                 /// <summary>
                 /// Sets the default consumer priority for consumers
                 /// </summary>
                 public byte Priority {
-                        get { return priority; } 
-                        set { this.priority = value; } 
+                        get { return priority; }
+                        set { this.priority = value; }
                 }
-
+		
+				/// <summary>
+				/// This property indicates whether or not async send is enabled.
+				/// </summary>
+				public bool AsyncSend
+				{
+					get { return asyncSend; }
+					set { asyncSend = value; }
+				}
+		
                 public void Dispose()
                 {
                         connection.DisposeOf(info.SessionId);
@@ -316,35 +327,36 @@
                 // Properties
 
                 public Connection Connection {
-                        get { return connection; } 
+                        get { return connection; }
                 }
 
                 public SessionId SessionId {
-                        get { return info.SessionId; } 
+                        get { return info.SessionId; }
                 }
 
                 public AcknowledgementMode AcknowledgementMode {
-                        get { return acknowledgementMode; } 
+                        get { return acknowledgementMode; }
                 }
 
                 public bool Transacted {
-                        get { return acknowledgementMode == AcknowledgementMode.Transactional;
} 
+                        get { return acknowledgementMode == AcknowledgementMode.Transactional;
}
                 }
 
                 public TransactionContext TransactionContext {
-                        get { return transactionContext; } 
+                        get { return transactionContext; }
                 }
 
                 // Implementation methods
-                public void DoSend(IDestination destination, IMessage message, bool persistent,
byte priority, TimeSpan timeToLive)
+				public void DoSend(ActiveMQMessage message)
                 {
-                        ActiveMQMessage command = ActiveMQMessage.Transform(message);
-						command.Persistent = persistent;
-						command.Priority = priority;
-						
-                        // TODO add time to live
-
-                        connection.SyncRequest(command);
+					if (AsyncSend)
+					{
+						connection.OneWay(message);
+					}
+					else
+					{
+						connection.SyncRequest(message);
+					}
                 }
 
                 public void Close()



Mime
View raw message