activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1468679 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ test/csharp/
Date Tue, 16 Apr 2013 23:27:36 GMT
Author: tabish
Date: Tue Apr 16 23:27:35 2013
New Revision: 1468679

URL: http://svn.apache.org/r1468679
Log:
Add configuration options for features that need to be added into the NMS.ActiveMQ lib to
sync up with the Java and C++ client.  

Wire up the ConnectionAudit bits in Connection and ensure that consumers and session get removed
from Audit when disposed, this will allow Consumers to support duplicate Message filtering
later on. 

Fix https://issues.apache.org/jira/browse/AMQNET-426 by throwing the correct exception.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/InvalidCredentialsTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
Tue Apr 16 23:27:35 2013
@@ -43,7 +43,7 @@ namespace Apache.NMS.ActiveMQ
             this.info.PrefetchSize = 1000;
             this.info.NoLocal = true;
 
-            this.connection.addDispatcher(consumerId, this);
+            this.connection.AddDispatcher(consumerId, this);
             this.connection.SyncRequest(this.info);
         }
 
@@ -62,7 +62,7 @@ namespace Apache.NMS.ActiveMQ
                 {
                     Tracer.Debug("Failed to send remove for AdvisoryConsumer: " + e.Message);
                 }
-                this.connection.removeDispatcher(this.info.ConsumerId);
+                this.connection.RemoveDispatcher(this.info.ConsumerId);
             }
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Apr
16 23:27:35 2013
@@ -50,6 +50,15 @@ namespace Apache.NMS.ActiveMQ
 		private int producerWindowSize = 0;
 		private bool messagePrioritySupported = true;
 		private bool watchTopicAdviosires = true;
+		private bool optimizeAcknowledge;
+    	private long optimizeAcknowledgeTimeOut = 300;
+    	private long optimizedAckScheduledAckInterval = 0;
+	    private bool useRetroactiveConsumer;
+	    private bool exclusiveConsumer;
+	    private long consumerFailoverRedeliveryWaitPeriod = 0;
+	    private bool checkForDuplicates = true;
+	    private bool transactedIndividualAck = false;
+		private bool nonBlockingRedelivery = false;
 
 		private bool userSpecifiedClientID;
 		private readonly Uri brokerUri;
@@ -84,6 +93,7 @@ namespace Apache.NMS.ActiveMQ
 		private readonly MessageTransformation messageTransformation;
 		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 		private AdvisoryConsumer advisoryConsumer = null;
+		private readonly ConnectionAudit connectionAudit = new ConnectionAudit();
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
@@ -100,6 +110,7 @@ namespace Apache.NMS.ActiveMQ
 			this.info.FaultTolerant = transport.IsFaultTolerant;
 
 			this.messageTransformation = new ActiveMQMessageTransformation(this);
+			this.connectionAudit.CheckForDuplicates = transport.IsFaultTolerant;
 		}
 
 		~Connection()
@@ -275,6 +286,72 @@ namespace Apache.NMS.ActiveMQ
 			set { this.messagePrioritySupported = value; }
 		}
 
+    	public bool OptimizeAcknowledge 
+		{
+			get { return this.optimizeAcknowledge; }
+			set { this.optimizeAcknowledge = value; }
+		}
+
+    	public long OptimizeAcknowledgeTimeOut
+		{
+			get { return this.optimizeAcknowledgeTimeOut; }
+			set { this.optimizeAcknowledgeTimeOut = value; }
+		}
+
+		public long OptimizedAckScheduledAckInterval
+		{
+			get { return this.optimizedAckScheduledAckInterval; }
+			set { this.optimizedAckScheduledAckInterval = value; }
+		}
+
+		public bool UseRetroactiveConsumer
+		{
+			get { return this.useRetroactiveConsumer; }
+			set { this.useRetroactiveConsumer = value; }
+		}
+
+		public bool ExclusiveConsumer
+		{
+			get { return this.exclusiveConsumer; }
+			set { this.exclusiveConsumer = value; }
+		}
+
+		public long ConsumerFailoverRedeliveryWaitPeriod
+		{
+			get { return this.consumerFailoverRedeliveryWaitPeriod; }
+			set { this.consumerFailoverRedeliveryWaitPeriod = value; }
+		}
+
+		public bool CheckForDuplicates
+		{
+			get { return this.checkForDuplicates; }
+			set { this.checkForDuplicates = value; }
+		}
+
+		public bool TransactedIndividualAck
+		{
+			get { return this.transactedIndividualAck; }
+			set { this.transactedIndividualAck = value; }
+		}
+
+		public bool NonBlockingRedelivery
+		{
+			get { return this.nonBlockingRedelivery; }
+			set { this.nonBlockingRedelivery = value; }
+		}
+
+		public int AuditDepth
+		{
+			get { return this.connectionAudit.AuditDepth; }
+			set { this.connectionAudit.AuditDepth = value; }
+		}
+
+		public int AuditMaximumProducerNumber
+		{
+			get { return this.connectionAudit.AuditMaximumProducerNumber; }
+			set { this.connectionAudit.AuditMaximumProducerNumber = value; }
+		}
+
 		public IConnectionMetaData MetaData
 		{
 			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
@@ -498,10 +575,11 @@ namespace Apache.NMS.ActiveMQ
 			if(!this.closing.Value)
 			{
 				sessions.Remove(session);
+				RemoveDispatcher(session);
 			}
 		}
 
-		internal void addDispatcher(ConsumerId id, IDispatcher dispatcher)
+		internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
 		{
 			if(!this.closing.Value)
 			{
@@ -509,7 +587,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		internal void removeDispatcher(ConsumerId id)
+		internal void RemoveDispatcher(ConsumerId id)
 		{
 			if(!this.closing.Value)
 			{
@@ -517,7 +595,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		internal void addProducer(ProducerId id, MessageProducer producer)
+		internal void AddProducer(ProducerId id, MessageProducer producer)
 		{
 			if(!this.closing.Value)
 			{
@@ -525,7 +603,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		internal void removeProducer(ProducerId id)
+		internal void RemoveProducer(ProducerId id)
 		{
 			if(!this.closing.Value)
 			{
@@ -533,6 +611,21 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
+	    internal void RemoveDispatcher(IDispatcher dispatcher) 
+		{
+	        this.connectionAudit.RemoveDispatcher(dispatcher);
+	    }
+
+	    internal bool IsDuplicate(IDispatcher dispatcher, Message message) 
+		{
+	        return this.checkForDuplicates && this.connectionAudit.IsDuplicate(dispatcher,
message);
+	    }
+
+	    internal void RollbackDuplicate(IDispatcher dispatcher, Message message)
+		{
+	        this.connectionAudit.RollbackDuplicate(dispatcher, message);
+	    }
+
 		public void Close()
 		{
 			if(!this.closed.Value && !transportFailed.Value)
@@ -703,8 +796,23 @@ namespace Apache.NMS.ActiveMQ
 				{
 					ExceptionResponse exceptionResponse = (ExceptionResponse) response;
 					Exception exception = CreateExceptionFromBrokerError(exceptionResponse.Exception);
+
+					// Security exception on connect means this Connection is unusable, close the
+					// transport now to free its resources.
+					if (exception is NMSSecurityException && command.IsConnectionInfo)
+					{
+						try
+						{
+							transport.Dispose();
+						}
+						catch
+						{
+						}
+					}
+
 					throw exception;
 				}
+
 				return response;
 			}
 			catch(NMSException)
@@ -1388,7 +1496,7 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(exceptionClassName.StartsWith("java.lang.SecurityException"))
 				{
-					exceptionClassName = "Apache.NMS.InvalidClientIDException";
+					exceptionClassName = "Apache.NMS.NMSSecurityException";
 				}
 				else if(!exceptionClassName.StartsWith("Apache.NMS"))
 				{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
Tue Apr 16 23:27:35 2013
@@ -50,8 +50,19 @@ namespace Apache.NMS.ActiveMQ
 		private int producerWindowSize = 0;
 		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-		private bool messagePrioritySupported=true;
-        private bool watchTopicAdvisories=true;
+		private bool messagePrioritySupported = true;
+        private bool watchTopicAdvisories = true;
+    	private bool optimizeAcknowledge;
+    	private long optimizeAcknowledgeTimeOut = 300;
+    	private long optimizedAckScheduledAckInterval = 0;
+	    private bool useRetroactiveConsumer;
+	    private bool exclusiveConsumer;
+	    private long consumerFailoverRedeliveryWaitPeriod = 0;
+	    private bool checkForDuplicates = true;
+	    private bool transactedIndividualAck = false;
+		private bool nonBlockingRedelivery = false;
+		private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+    	private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
 
 		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
 		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -71,13 +82,11 @@ namespace Apache.NMS.ActiveMQ
 #endif
 		}
 
-		public ConnectionFactory()
-			: this(GetDefaultBrokerUrl())
+		public ConnectionFactory() : this(GetDefaultBrokerUrl())
 		{
 		}
 
-		public ConnectionFactory(string brokerUri)
-			: this(brokerUri, null)
+		public ConnectionFactory(string brokerUri) : this(brokerUri, null)
 		{
 		}
 
@@ -86,8 +95,7 @@ namespace Apache.NMS.ActiveMQ
 		{
 		}
 
-		public ConnectionFactory(Uri brokerUri)
-			: this(brokerUri, null)
+		public ConnectionFactory(Uri brokerUri) : this(brokerUri, null)
 		{
 		}
 
@@ -402,6 +410,72 @@ namespace Apache.NMS.ActiveMQ
 			set { this.producerTransformer = value; }
 		}
 
+    	public bool OptimizeAcknowledge 
+		{
+			get { return this.optimizeAcknowledge; }
+			set { this.optimizeAcknowledge = value; }
+		}
+
+    	public long OptimizeAcknowledgeTimeOut
+		{
+			get { return this.optimizeAcknowledgeTimeOut; }
+			set { this.optimizeAcknowledgeTimeOut = value; }
+		}
+
+		public long OptimizedAckScheduledAckInterval
+		{
+			get { return this.optimizedAckScheduledAckInterval; }
+			set { this.optimizedAckScheduledAckInterval = value; }
+		}
+
+		public bool UseRetroactiveConsumer
+		{
+			get { return this.useRetroactiveConsumer; }
+			set { this.useRetroactiveConsumer = value; }
+		}
+
+		public bool ExclusiveConsumer
+		{
+			get { return this.exclusiveConsumer; }
+			set { this.exclusiveConsumer = value; }
+		}
+
+		public long ConsumerFailoverRedeliveryWaitPeriod
+		{
+			get { return this.consumerFailoverRedeliveryWaitPeriod; }
+			set { this.consumerFailoverRedeliveryWaitPeriod = value; }
+		}
+
+		public bool CheckForDuplicates
+		{
+			get { return this.checkForDuplicates; }
+			set { this.checkForDuplicates = value; }
+		}
+
+		public bool TransactedIndividualAck
+		{
+			get { return this.transactedIndividualAck; }
+			set { this.transactedIndividualAck = value; }
+		}
+
+		public bool NonBlockingRedelivery
+		{
+			get { return this.nonBlockingRedelivery; }
+			set { this.nonBlockingRedelivery = value; }
+		}
+
+		public int AuditDepth
+		{
+			get { return this.auditDepth; }
+			set { this.auditDepth = value; }
+		}
+
+		public int AuditMaximumProducerNumber
+		{
+			get { return this.auditMaximumProducerNumber; }
+			set { this.auditMaximumProducerNumber = value; }
+		}
+
 		#endregion
 
 		protected virtual void ConfigureConnection(Connection connection)
@@ -423,6 +497,17 @@ namespace Apache.NMS.ActiveMQ
 			connection.ConsumerTransformer = this.consumerTransformer;
 			connection.ProducerTransformer = this.producerTransformer;
             connection.WatchTopicAdvisories = this.watchTopicAdvisories;
+			connection.OptimizeAcknowledge = this.optimizeAcknowledge;
+			connection.OptimizeAcknowledgeTimeOut = this.optimizeAcknowledgeTimeOut;
+			connection.OptimizedAckScheduledAckInterval = this.optimizedAckScheduledAckInterval;
+			connection.UseRetroactiveConsumer = this.useRetroactiveConsumer;
+			connection.ExclusiveConsumer = this.exclusiveConsumer;
+			connection.ConsumerFailoverRedeliveryWaitPeriod = this.consumerFailoverRedeliveryWaitPeriod;
+			connection.CheckForDuplicates = this.checkForDuplicates;
+			connection.TransactedIndividualAck = this.transactedIndividualAck;
+			connection.NonBlockingRedelivery = this.nonBlockingRedelivery;
+			connection.AuditDepth = this.auditDepth;
+			connection.AuditMaximumProducerNumber = this.auditMaximumProducerNumber;
 		}
 
 		protected static void ExceptionHandler(Exception ex)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Tue Apr 16 23:27:35 2013
@@ -416,7 +416,7 @@ namespace Apache.NMS.ActiveMQ
 					}
 				}
 
-				this.session.RemoveConsumer(this.ConsumerId);
+				this.session.RemoveConsumer(this);
 				this.unconsumedMessages.Close();
 
 				if(Tracer.IsDebugEnabled)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs Tue
Apr 16 23:27:35 2013
@@ -114,7 +114,7 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(consumer != null)
 				{
-					this.session.RemoveConsumer(consumer.ConsumerId);
+					this.session.RemoveConsumer(consumer);
 					consumer.Close();
 				}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Tue Apr
16 23:27:35 2013
@@ -501,7 +501,7 @@ namespace Apache.NMS.ActiveMQ
             {
                 if(consumer != null)
                 {
-                    this.RemoveConsumer(consumer.ConsumerId);
+                    this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
@@ -542,7 +542,7 @@ namespace Apache.NMS.ActiveMQ
             {
                 if(consumer != null)
                 {
-                    this.RemoveConsumer(consumer.ConsumerId);
+                    this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
@@ -795,17 +795,18 @@ namespace Apache.NMS.ActiveMQ
 
                 // Registered with Connection before we register at the broker.
                 consumers[id] = consumer;
-                connection.addDispatcher(id, this);
+                connection.AddDispatcher(id, this);
             }
         }
 
-        public void RemoveConsumer(ConsumerId objectId)
+        public void RemoveConsumer(MessageConsumer consumer)
         {
-            connection.removeDispatcher(objectId);
+            connection.RemoveDispatcher(consumer.ConsumerId);
             if(!this.closing)
             {
-                consumers.Remove(objectId);
+                consumers.Remove(consumer.ConsumerId);
             }
+			connection.RemoveDispatcher(consumer);
         }
 
         public void AddProducer(MessageProducer producer)
@@ -815,13 +816,13 @@ namespace Apache.NMS.ActiveMQ
                 ProducerId id = producer.ProducerId;
 
                 this.producers[id] = producer;
-                this.connection.addProducer(id, producer);
+                this.connection.AddProducer(id, producer);
             }
         }
 
         public void RemoveProducer(ProducerId objectId)
         {
-            connection.removeProducer(objectId);
+            connection.RemoveProducer(objectId);
             if(!this.closing)
             {
                 producers.Remove(objectId);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/InvalidCredentialsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/InvalidCredentialsTest.cs?rev=1468679&r1=1468678&r2=1468679&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/InvalidCredentialsTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/InvalidCredentialsTest.cs
Tue Apr 16 23:27:35 2013
@@ -43,11 +43,12 @@ namespace Apache.NMS.ActiveMQ.Test
 			// To run this test successfully, the broker must have secure login enabled.
 			// This test will attempt to login to the server using invalid credentials.
 			// It should not connect, and should not go into failover retry loop.
-			// It will then attempt to login with correct credentials, and it should be succcessful
on second attempt.
+			// It will then attempt to login with correct credentials, and it should be 
+			// succcessful on second attempt.
 			Assert.IsTrue(CreateNMSFactory("InvalidCredentials-BogusUser"));
 			using(IConnection connection = CreateConnection())
 			{
-				Assert.Throws(typeof(InvalidClientIDException), () => { connection.Start(); }, "You
may not have enabled credential login on the broker.  Credentials must be enabled for this
test to pass.");
+				Assert.Throws(typeof(NMSSecurityException), () => { connection.Start(); }, "You may
not have enabled credential login on the broker.  Credentials must be enabled for this test
to pass.");
 			}
 
 			// Now connect with a valid user account.



Mime
View raw message