activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1144402 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/Commands/ main/csharp/Util/ test/csharp/
Date Fri, 08 Jul 2011 17:46:16 GMT
Author: tabish
Date: Fri Jul  8 17:46:16 2011
New Revision: 1144402

URL: http://svn.apache.org/viewvc?rev=1144402&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-334

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs Fri Jul  8 17:46:16 2011
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// Consumes Advisory Messages for Temp Destination creation on deletion so that
+    /// the connection can track valid destinations for its sessions, and session resources.
+    /// </summary>
+    internal class AdvisoryConsumer : IDispatcher
+    {
+        private readonly Connection connection;
+        private readonly ConsumerInfo info;
+
+        private bool closed = false;
+        private int deliveredCounter = 0;
+
+        internal AdvisoryConsumer(Connection connection, ConsumerId consumerId) : base()
+        {
+            this.connection = connection;
+            this.info = new ConsumerInfo();
+            this.info.ConsumerId = consumerId;
+            this.info.Destination = AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
+            this.info.PrefetchSize = 1000;
+            this.info.NoLocal = true;
+
+            this.connection.addDispatcher(consumerId, this);
+            this.connection.SyncRequest(this.info);
+        }
+
+        internal void Dispose()
+        {
+            if(!closed)
+            {
+                this.closed = true;
+                try
+                {
+                    RemoveInfo removeIt = new RemoveInfo();
+                    removeIt.ObjectId = this.info.ConsumerId;
+                    this.connection.Oneway(removeIt);
+                }
+                catch(Exception e)
+                {
+                    Tracer.Debug("Failoed to send remove for AdvisoryConsumer: " + e.Message);
+                }
+                this.connection.removeDispatcher(this.info.ConsumerId);
+            }
+        }
+
+        public void Dispatch(MessageDispatch messageDispatch)
+        {
+            // Auto ack messages when we reach 75% of the prefetch
+            deliveredCounter++;
+
+            if(deliveredCounter > (0.75 * this.info.PrefetchSize))
+            {
+                try
+                {
+                    MessageAck ack = new MessageAck();
+                    ack.AckType = (byte)AckType.ConsumedAck;
+                    ack.FirstMessageId = messageDispatch.Message.MessageId;
+                    ack.MessageCount = deliveredCounter;
+
+                    this.connection.Oneway(ack);
+                    this.deliveredCounter = 0;
+                }
+                catch(Exception e)
+                {
+                    this.connection.OnAsyncException(e);
+                }
+            }
+
+            DestinationInfo destInfo = messageDispatch.Message.DataStructure as DestinationInfo;
+            if(destInfo != null)
+            {
+                ProcessDestinationInfo(destInfo);
+            }
+            else
+            {
+                // This can happen across networks
+                Tracer.Debug("Unexpected message was dispatched to the AdvisoryConsumer: " + messageDispatch);
+            }
+        }
+
+        private void ProcessDestinationInfo(DestinationInfo destInfo)
+        {
+            ActiveMQDestination dest = destInfo.Destination;
+            if(!dest.IsTemporary)
+            {
+                return;
+            }
+    
+            ActiveMQTempDestination tempDest = dest as ActiveMQTempDestination;
+            if(destInfo.OperationType == DestinationInfo.ADD_OPERATION_TYPE)
+            {
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("AdvisoryConsumer adding: " + tempDest);
+                }
+                this.connection.AddTempDestination(tempDest);
+            }
+            else if(destInfo.OperationType == DestinationInfo.REMOVE_OPERATION_TYPE)
+            {
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("AdvisoryConsumer removing: " + tempDest);
+                }
+                this.connection.RemoveTempDestination(tempDest);
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/AdvisoryConsumer.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQDestination.cs Fri Jul  8 17:46:16 2011
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Specialized;
+using System.Collections.Generic;
 using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Commands
@@ -423,6 +424,30 @@ namespace Apache.NMS.ActiveMQ.Commands
 			}
 		}
 
+        /// <summary>
+        /// Gets the Destination Type of this Destination as a String value which is one
+        /// of {Queue,Topic,TempQueue,TempTopic}.
+        /// </summary>
+        /// <returns>
+        /// The Destination Type as a String.
+        /// </returns>
+        public String GetDestinationTypeAsString()
+        {
+            switch(GetDestinationType())
+            {
+                case ACTIVEMQ_QUEUE:
+                    return "Queue";
+                case ACTIVEMQ_TOPIC:
+                    return "Topic";
+                case ACTIVEMQ_TEMPORARY_QUEUE:
+                    return "TempQueue";
+                case ACTIVEMQ_TEMPORARY_TOPIC:
+                    return "TempTopic";
+                default:
+                    throw new NMSException("Invalid destination type: " + GetDestinationType());
+            }
+        }
+
 		/// <summary>
 		/// Returns true if this destination represents a collection of
 		/// destinations; allowing a set of destinations to be published to or subscribed
@@ -439,30 +464,32 @@ namespace Apache.NMS.ActiveMQ.Commands
 			}
 		}
 
-		/*public List GetChildDestinations() {
-		 List answer = new ArrayList();
-		 StringTokenizer iter = new StringTokenizer(physicalName, COMPOSITE_SEPARATOR);
-		 while (iter.hasMoreTokens()) {
-		 String name = iter.nextToken();
-		 Destination child = null;
-		 if (name.StartsWith(QUEUE_PREFIX)) {
-		 child = new ActiveMQQueue(name.Substring(QUEUE_PREFIX.Length));
-		 }
-		 else if (name.StartsWith(TOPIC_PREFIX)) {
-		 child = new ActiveMQTopic(name.Substring(TOPIC_PREFIX.Length));
-		 }
-		 else {
-		 child = createDestination(name);
-		 }
-		 answer.add(child);
-		 }
-		 if (answer.size() == 1) {
-		 // lets put ourselves inside the collection
-		 // as we are not really a composite destination
-		 answer.set(0, this);
-		 }
-		 return answer;
-		 }*/
+        public ActiveMQDestination[] GetCompositeDestinations()
+        {
+            if (IsComposite)
+            {
+                LinkedList<String> list = new LinkedList<String>();
+                String[] composites = physicalName.Split(COMPOSITE_SEPARATOR.ToCharArray());
+                foreach(String composite in composites)
+                {
+                    if (String.IsNullOrEmpty(composite.Trim()))
+                    {
+                        continue;
+                    }
+                    list.AddLast(composite.Trim());
+                }
+                ActiveMQDestination[] compositeDestinations = new ActiveMQDestination[list.Count];
+                int counter = 0;
+                foreach(String destination in list)
+                {
+                    compositeDestinations[counter++] = CreateDestination(destination);
+                }
+
+                return compositeDestinations;
+            }
+
+            return new ActiveMQDestination[0];
+        }
 
 		/// <summary>
 		/// </summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Jul  8 17:46:16 2011
@@ -18,6 +18,7 @@
 using System;
 using System.Diagnostics;
 using System.Collections;
+using System.Collections.Generic;
 using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Threads;
@@ -46,6 +47,7 @@ namespace Apache.NMS.ActiveMQ
 		private bool dispatchAsync = true;
 		private int producerWindowSize = 0;
 		private bool messagePrioritySupported=true;
+        private bool watchTopicAdviosires = true;
 
 		private bool userSpecifiedClientID;
 		private readonly Uri brokerUri;
@@ -58,6 +60,7 @@ namespace Apache.NMS.ActiveMQ
 		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
 		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
 		private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+        private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
 		private readonly object connectedLock = new object();
 		private readonly Atomic<bool> connected = new Atomic<bool>(false);
 		private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -74,9 +77,11 @@ namespace Apache.NMS.ActiveMQ
 		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
 		private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 		private readonly IdGenerator clientIdGenerator;
+        private int consumerIdCounter = 0;
 		private volatile CountDownLatch transportInterruptionProcessingComplete;
 		private readonly MessageTransformation messageTransformation;
 		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+        private AdvisoryConsumer advisoryConsumer = null;
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
@@ -319,6 +324,12 @@ namespace Apache.NMS.ActiveMQ
 			set { this.dispatchAsync = value; }
 		}
 
+        public bool WatchTopicAdvisories
+        {
+            get { return this.watchTopicAdviosires; }
+            set { this.watchTopicAdviosires = value; }
+        }
+
 		public string ClientId
 		{
 			get { return info.ClientId; }
@@ -535,6 +546,12 @@ namespace Apache.NMS.ActiveMQ
 					Tracer.Info("Connection.Close(): Closing Connection Now.");
 					this.closing.Value = true;
 
+                    if(this.advisoryConsumer != null)
+                    {
+                        this.advisoryConsumer.Dispose();
+                        this.advisoryConsumer = null;
+                    }
+
 					lock(sessions.SyncRoot)
 					{
 						foreach(Session session in sessions)
@@ -544,6 +561,11 @@ namespace Apache.NMS.ActiveMQ
 					}
 					sessions.Clear();
 
+                    foreach(ActiveMQTempDestination dest in this.tempDests.Values)
+                    {
+                        dest.Delete();
+                    }
+
 					// Connected is true only when we've successfully sent our ConnectionInfo
 					// to the broker, so if we haven't announced ourselves there's no need to
 					// inform the broker of a remove, and if the transport is failed, why bother.
@@ -733,6 +755,13 @@ namespace Apache.NMS.ActiveMQ
 										if(!(response is ExceptionResponse))
 										{
 											connected.Value = true;
+                                            if(this.watchTopicAdviosires)
+                                            {
+                                                ConsumerId id = new ConsumerId(
+                                                    new SessionId(info.ConnectionId, -1),
+                                                    Interlocked.Increment(ref this.consumerIdCounter));
+                                                this.advisoryConsumer = new AdvisoryConsumer(this, id);
+                                            }
 										}
 									}
 								}
@@ -1090,6 +1119,7 @@ namespace Apache.NMS.ActiveMQ
 			this.SyncRequest(command);
 
 			destination.Connection = this;
+            this.AddTempDestination(destination);
 
 			return destination;
 		}
@@ -1100,6 +1130,19 @@ namespace Apache.NMS.ActiveMQ
 
 		public void DeleteTemporaryDestination(IDestination destination)
 		{
+            CheckClosedOrFailed();
+
+            ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
+
+            foreach(Session session in this.sessions)
+            {
+                if(session.IsInUse(temp))
+                {
+                    throw new NMSException("A consumer is consuming from the temporary destination");
+                }
+            }
+
+            this.tempDests.Remove(destination as ActiveMQTempDestination);
 			this.DeleteDestination(destination);
 		}
 
@@ -1184,5 +1227,49 @@ namespace Apache.NMS.ActiveMQ
 				}
 			}
 		}
+
+        internal void AddTempDestination(ActiveMQTempDestination dest)
+        {
+            // .NET lacks a putIfAbsent operation for Maps.
+            lock(tempDests.SyncRoot)
+            {
+                if(!this.tempDests.Contains(dest))
+                {
+                    this.tempDests.Add(dest, dest);
+                }
+            }
+        }
+
+        internal void RemoveTempDestination(ActiveMQTempDestination dest)
+        {
+            this.tempDests.Remove(dest);
+        }
+
+        internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
+        {
+            if(this.advisoryConsumer == null)
+            {
+                return true;
+            }
+
+            return this.tempDests.Contains(dest);
+        }
+
+        protected void CheckClosedOrFailed()
+        {
+            CheckClosed();
+            if (transportFailed.Value)
+            {
+                throw new ConnectionFailedException(firstFailureError.Message);
+            }
+        }
+
+        protected void CheckClosed()
+        {
+            if(closed.Value)
+            {
+                throw new ConnectionClosedException();
+            }
+        }
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Jul  8 17:46:16 2011
@@ -51,6 +51,7 @@ namespace Apache.NMS.ActiveMQ
 		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
 		private bool messagePrioritySupported=true;
+        private bool watchTopicAdvisories=true;
 
 		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
 		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -267,6 +268,12 @@ namespace Apache.NMS.ActiveMQ
 			set { this.dispatchAsync = value; }
 		}
 
+        public bool WatchTopicAdvisories
+        {
+            get { return this.watchTopicAdvisories; }
+            set { this.watchTopicAdvisories = value; }
+        }
+
 		public bool MessagePrioritySupported
 		{
 			get { return this.messagePrioritySupported; }
@@ -410,6 +417,7 @@ namespace Apache.NMS.ActiveMQ
 			connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
 			connection.ConsumerTransformer = this.consumerTransformer;
 			connection.ProducerTransformer = this.producerTransformer;
+            connection.WatchTopicAdvisories = this.watchTopicAdvisories;
 		}
 
 		protected static void ExceptionHandler(Exception ex)

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs Fri Jul  8 17:46:16 2011
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// Exception thrown when a connection is used that it has failed in some way.
+    /// </summary>
+    [Serializable]
+    public class ConnectionFailedException : NMSException
+    {
+        public ConnectionFailedException()
+            : base("The connection has failed!")
+        {
+        }
+
+        public ConnectionFailedException(string message)
+            : base(message)
+        {
+        }
+
+        public ConnectionFailedException(string message, string errorCode)
+            : base(message, errorCode)
+        {
+        }
+
+        public ConnectionFailedException(string message, Exception innerException)
+            : base(message, innerException)
+        {
+        }
+
+        public ConnectionFailedException(string message, string errorCode, Exception innerException)
+            : base(message, errorCode, innerException)
+        {
+        }
+
+        #region ISerializable interface implementation
+
+        /// <summary>
+        /// Initializes a new instance of the ConnectionFailedException class with serialized data.
+        /// Throws System.ArgumentNullException if the info parameter is null.
+        /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+        /// </summary>
+        /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+        /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+        protected ConnectionFailedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+            : base(info, context)
+        {
+        }
+
+        #endregion
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFailedException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Jul  8 17:46:16 2011
@@ -74,7 +74,32 @@ namespace Apache.NMS.ActiveMQ
 			if(destination == null)
 			{
 				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
-			}
+            }
+            else if(destination.PhysicalName == null)
+            {
+                throw new InvalidDestinationException("The destination object was not given a physical name.");
+            }
+            else if (destination.IsTemporary)
+            {
+                String physicalName = destination.PhysicalName;
+
+                if(String.IsNullOrEmpty(physicalName))
+                {
+                    throw new InvalidDestinationException("Physical name of Destination should be valid: " + destination);
+                }
+    
+                String connectionID = session.Connection.ConnectionId.Value;
+
+                if(physicalName.IndexOf(connectionID) < 0)
+                {
+                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
+                }
+    
+                if(!session.Connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
+                {
+                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
+                }
+            }
 
 			this.session = session;
 			this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
@@ -1201,6 +1226,11 @@ namespace Apache.NMS.ActiveMQ
 			get { return this.session.IsClientAcknowledge; }
 		}
 
+        internal bool IsInUse(ActiveMQTempDestination dest)
+        {
+            return this.info.Destination.Equals(dest);
+        }
+
 		#region Nested ISyncronization Types
 
 		class MessageConsumerSynchronization : ISynchronization

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Fri Jul  8 17:46:16 2011
@@ -255,7 +255,7 @@ namespace Apache.NMS.ActiveMQ
 					throw new ConnectionClosedException();
 				}
 
-				session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
+				session.DoSend(dest, activeMessage, this, this.usage, this.RequestTimeout);
 			}
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1144402&r1=1144401&r2=1144402&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Jul  8 17:46:16 2011
@@ -655,10 +655,16 @@ namespace Apache.NMS.ActiveMQ
 
         #endregion
 
-        internal void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
+        internal void DoSend(ActiveMQDestination destination, ActiveMQMessage message,
+                             MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout)
         {
             ActiveMQMessage msg = message;
 
+            if(destination.IsTemporary && !connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
+            {
+                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
+            }
+
             if(IsTransacted)
             {
                 DoStartTransaction();
@@ -960,5 +966,17 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
+        internal bool IsInUse(ActiveMQTempDestination dest)
+        {
+            foreach(MessageConsumer consumer in this.consumers.Values)
+            {
+                if(consumer.IsInUse(dest))
+                {
+                    return true;
+                }
+            }
+
+            return false;
+        }
     }
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs Fri Jul  8 17:46:16 2011
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class AdvisorySupport
+    {
+        public static readonly String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
+        public static readonly ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Connection");
+        public static readonly ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
+        public static readonly ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
+        public static readonly ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
+        public static readonly ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
+        public static readonly String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
+        public static readonly String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
+        public static readonly String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+        public static readonly String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
+        public static readonly String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
+        public static readonly String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
+        public static readonly String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
+        public static readonly String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
+        public static readonly String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
+        public static readonly String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
+        public static readonly String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
+        public static readonly String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
+        public static readonly String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
+        public static readonly String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
+        public static readonly String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
+        public static readonly String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+        public static readonly String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
+        public static readonly String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+        public static readonly String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
+        public static readonly String AGENT_TOPIC = "ActiveMQ.Agent";
+        public static readonly String ADIVSORY_MESSAGE_TYPE = "Advisory";
+        public static readonly String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
+        public static readonly String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
+        public static readonly String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
+        public static readonly String MSG_PROPERTY_USAGE_NAME = "usageName";
+        public static readonly String MSG_PROPERTY_CONSUMER_ID = "consumerId";
+        public static readonly String MSG_PROPERTY_PRODUCER_ID = "producerId";
+        public static readonly String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
+        public static readonly String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
+        public static readonly String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
+
+        public static readonly ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
+                TEMP_QUEUE_ADVISORY_TOPIC.PhysicalName + "," + TEMP_TOPIC_ADVISORY_TOPIC.PhysicalName);
+
+        private AdvisorySupport()
+        {
+        }
+
+        public static ActiveMQTopic GetConnectionAdvisoryTopic()
+        {
+            return CONNECTION_ADVISORY_TOPIC;
+        }
+
+        public static ActiveMQTopic GetConsumerAdvisoryTopic(IDestination destination)
+        {
+            return GetConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static ActiveMQTopic GetConsumerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsQueue)
+            {
+                return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+            }
+            else
+            {
+                return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+            }
+        }
+    
+        public static ActiveMQTopic GetProducerAdvisoryTopic(IDestination destination)
+        {
+            return GetProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetProducerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsQueue)
+            {
+                return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+            }
+            else
+            {
+                return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.PhysicalName);
+            }
+        }
+    
+        public static ActiveMQTopic GetExpiredMessageTopic(IDestination destination)
+        {
+            return GetExpiredMessageTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static ActiveMQTopic GetExpiredMessageTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsQueue)
+            {
+                return GetExpiredQueueMessageAdvisoryTopic(destination);
+            }
+            return GetExpiredTopicMessageAdvisoryTopic(destination);
+        }
+    
+        public static ActiveMQTopic GetExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetExpiredQueueMessageAdvisoryTopic(IDestination destination)
+        {
+            return GetExpiredQueueMessageAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetNoTopicConsumersAdvisoryTopic(IDestination destination)
+        {
+            return GetNoTopicConsumersAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetNoQueueConsumersAdvisoryTopic(IDestination destination)
+        {
+            return GetNoQueueConsumersAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static ActiveMQTopic GetNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetSlowConsumerAdvisoryTopic(IDestination destination)
+        {
+            return GetSlowConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetSlowConsumerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetFastProducerAdvisoryTopic(IDestination destination)
+        {
+            return GetFastProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static ActiveMQTopic GetFastProducerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = FAST_PRODUCER_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+
+        public static ActiveMQTopic GetMessageDiscardedAdvisoryTopic(IDestination destination)
+        {
+            return GetMessageDiscardedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetMessageDiscardedAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetMessageDeliveredAdvisoryTopic(IDestination destination)
+        {
+            return GetMessageDeliveredAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static ActiveMQTopic GetMessageDeliveredAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+
+        public static ActiveMQTopic GetMessageConsumedAdvisoryTopic(IDestination destination)
+        {
+            return GetMessageConsumedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetMessageConsumedAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+
+        public static ActiveMQTopic GetMessageDLQdAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetMasterBrokerAdvisoryTopic(IDestination destination)
+        {
+            return GetMasterBrokerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetMasterBrokerAdvisoryTopic()
+        {
+            return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+        }
+
+        public static ActiveMQTopic GetNetworkBridgeAdvisoryTopic()
+        {
+            return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
+        }
+    
+        public static ActiveMQTopic GetFullAdvisoryTopic(IDestination destination)
+        {
+            return GetFullAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static ActiveMQTopic GetFullAdvisoryTopic(ActiveMQDestination destination)
+        {
+            String name = FULL_TOPIC_PREFIX + destination.GetDestinationTypeAsString() + "."
+                    + destination.PhysicalName;
+            return new ActiveMQTopic(name);
+        }
+    
+        public static ActiveMQTopic GetDestinationAdvisoryTopic(IDestination destination)
+        {
+            return GetDestinationAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static ActiveMQTopic GetDestinationAdvisoryTopic(ActiveMQDestination destination)
+        {
+            switch (destination.GetDestinationType())
+            {
+                case ActiveMQDestination.ACTIVEMQ_QUEUE:
+                    return QUEUE_ADVISORY_TOPIC;
+                case ActiveMQDestination.ACTIVEMQ_TOPIC:
+                    return TOPIC_ADVISORY_TOPIC;
+                case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE:
+                    return TEMP_QUEUE_ADVISORY_TOPIC;
+                case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC:
+                    return TEMP_TOPIC_ADVISORY_TOPIC;
+                default:
+                    throw new NMSException("Unknown destination type: " + destination.DestinationType);
+            }
+        }
+
+        public static bool IsDestinationAdvisoryTopic(IDestination destination)
+        {
+            return IsDestinationAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static bool IsDestinationAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsDestinationAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.Equals(TEMP_QUEUE_ADVISORY_TOPIC) ||
+                       destination.Equals(TEMP_TOPIC_ADVISORY_TOPIC) ||
+                       destination.Equals(QUEUE_ADVISORY_TOPIC) ||
+                       destination.Equals(TOPIC_ADVISORY_TOPIC);
+            }
+        }
+
+        public static bool IsAdvisoryTopic(IDestination destination)
+        {
+            return IsAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(ADVISORY_TOPIC_PREFIX);
+            }
+        }
+
+        public static bool IsConnectionAdvisoryTopic(IDestination destination)
+        {
+            return IsConnectionAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsConnectionAdvisoryTopic(ActiveMQDestination destination) {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsConnectionAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.Equals(CONNECTION_ADVISORY_TOPIC);
+            }
+        }
+    
+        public static bool IsProducerAdvisoryTopic(IDestination destination)
+        {
+            return IsProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsProducerAdvisoryTopic(ActiveMQDestination destination) {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsProducerAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsConsumerAdvisoryTopic(IDestination destination)
+        {
+            return IsConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsConsumerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsConsumerAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsSlowConsumerAdvisoryTopic(IDestination destination)
+        {
+            return IsSlowConsumerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsSlowConsumerAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(SLOW_CONSUMER_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsFastProducerAdvisoryTopic(IDestination destination)
+        {
+            return IsFastProducerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsFastProducerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsFastProducerAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(FAST_PRODUCER_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsMessageConsumedAdvisoryTopic(IDestination destination)
+        {
+            return IsMessageConsumedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsMessageConsumedAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsMessageConsumedAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsMasterBrokerAdvisoryTopic(IDestination destination)
+        {
+            return IsMasterBrokerAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static bool IsMasterBrokerAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsMasterBrokerAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(MASTER_BROKER_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsMessageDeliveredAdvisoryTopic(IDestination destination)
+        {
+            return IsMessageDeliveredAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+    
+        public static bool IsMessageDeliveredAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsMessageDeliveredAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsMessageDiscardedAdvisoryTopic(IDestination destination)
+        {
+            return IsMessageDiscardedAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static bool IsMessageDiscardedAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsMessageDiscardedAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
+            }
+        }
+    
+        public static bool IsFullAdvisoryTopic(IDestination destination)
+        {
+            return IsFullAdvisoryTopic(ActiveMQDestination.Transform(destination));
+        }
+
+        public static bool IsFullAdvisoryTopic(ActiveMQDestination destination)
+        {
+            if (destination.IsComposite)
+            {
+                ActiveMQDestination[] compositeDestinations = destination.GetCompositeDestinations();
+                for (int i = 0; i < compositeDestinations.Length; i++)
+                {
+                    if (IsFullAdvisoryTopic(compositeDestinations[i]))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+            else
+            {
+                return destination.IsTopic && destination.PhysicalName.StartsWith(FULL_TOPIC_PREFIX);
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/AdvisorySupport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs?rev=1144402&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs Fri Jul  8 17:46:16 2011
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Test;
+using Apache.NMS.Util;
+using Apache.NMS.Policies;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    public class TempDestinationTest : NMSTestSupport
+    {
+        private Connection connection;
+        private readonly IList connections = ArrayList.Synchronized(new ArrayList());
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            connection = this.CreateConnection() as Connection;
+            connections.Add(connection);
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            foreach(Connection connection in connections)
+            {
+                try
+                {
+                    connection.Close();
+                }
+                catch
+                {
+                }
+            }
+
+            connections.Clear();
+
+            base.TearDown();
+        }
+
+        /// <summary>
+        /// Make sure Temp destination can only be consumed by local connection
+        /// </summary>
+        [Test]
+        public void TestTempDestOnlyConsumedByLocalConn()
+        {
+            connection.Start();
+
+            ISession tempSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+            IMessageProducer producer = tempSession.CreateProducer(queue);
+            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+            ITextMessage message = tempSession.CreateTextMessage("First");
+            producer.Send(message);
+
+            // temp destination should not be consume when using another connection
+            Connection otherConnection = CreateConnection() as Connection;
+            connections.Add(otherConnection);
+            ISession otherSession = otherConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITemporaryQueue otherQueue = otherSession.CreateTemporaryQueue();
+            IMessageConsumer consumer = otherSession.CreateConsumer(otherQueue);
+            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+            Assert.IsNull(msg);
+
+            // should throw InvalidDestinationException when consuming a temp
+            // destination from another connection
+            try
+            {
+                consumer = otherSession.CreateConsumer(queue);
+                Assert.Fail("Send should fail since temp destination should be used from another connection");
+            }
+            catch(InvalidDestinationException)
+            {
+                Assert.IsTrue(true, "failed to throw an exception");
+            }
+
+            // should be able to consume temp destination from the same connection
+            consumer = tempSession.CreateConsumer(queue);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+            Assert.NotNull(msg);
+        }
+
+        /// <summary>
+        /// Make sure that a temp queue does not drop message if there is an active consumers.
+        /// </summary>
+        [Test]
+        public void TestTempQueueHoldsMessagesWithConsumers()
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.CreateTemporaryQueue();
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            connection.Start();
+
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message);
+
+            IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message2);
+            Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
+            Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
+                          "Expected message to be a '" + message.Text + "'");
+        }
+
+        /// <summary>
+        /// Make sure that a temp queue does not drop message if there are no active consumers.
+        /// </summary>
+        [Test]
+        public void TestTempQueueHoldsMessagesWithoutConsumers()
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.CreateTemporaryQueue();
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message);
+    
+            connection.Start();
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+            Assert.IsNotNull(message2);
+            Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
+            Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
+                          "Expected message to be a '" + message.Text + "'");
+    
+        }
+    
+        /// <summary>
+        /// Test temp queue works under load
+        /// </summary>
+        [Test]
+        public void TestTmpQueueWorksUnderLoad()
+        {
+            int count = 500;
+            int dataSize = 1024;
+    
+            ArrayList list = new ArrayList(count);
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.CreateTemporaryQueue();
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+            byte[] data = new byte[dataSize];
+            for (int i = 0; i < count; i++)
+            {
+                IBytesMessage message = session.CreateBytesMessage();
+                message.WriteBytes(data);
+                message.Properties.SetInt("c", i);
+                producer.Send(message);
+                list.Add(message);
+            }
+
+            connection.Start();
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            for (int i = 0; i < count; i++)
+            {
+                IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                Assert.IsTrue(message2 != null);
+                Assert.AreEqual(i, message2.Properties.GetInt("c"));
+                Assert.IsTrue(message2.Equals(list[i]));
+            }
+        }
+    
+        /// <summary>
+        /// Make sure you cannot publish to a temp destination that does not exist anymore.
+        /// </summary>
+        [Test]
+        public void TestPublishFailsForClosedConnection()
+        {
+            Connection tempConnection = CreateConnection() as Connection;
+            connections.Add(tempConnection);
+            ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            connection.Start();
+
+            // This message delivery should work since the temp connection is still
+            // open.
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+            ITextMessage message = session.CreateTextMessage("First");
+            producer.Send(message);
+            Thread.Sleep(1000);
+
+            // Closing the connection should destroy the temp queue that was
+            // created.
+            tempConnection.Close();
+            Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
+    
+            // This message delivery NOT should work since the temp connection is
+            // now closed.
+            try
+            {
+                message = session.CreateTextMessage("Hello");
+                producer.Send(message);
+                Assert.Fail("Send should fail since temp destination should not exist anymore.");
+            }
+            catch(NMSException e)
+            {
+                Tracer.Debug("Test threw expected exception: " + e.Message);
+            }
+        }
+    
+        /// <summary>
+        /// Make sure you cannot publish to a temp destination that does not exist anymore.
+        /// </summary>
+        [Test]
+        public void TestPublishFailsForDestoryedTempDestination()
+        {
+            Connection tempConnection = CreateConnection() as Connection;
+            connections.Add(tempConnection);
+            ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+    
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            connection.Start();
+
+            // This message delivery should work since the temp connection is still
+            // open.
+            IMessageProducer producer = session.CreateProducer(queue);
+            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+            ITextMessage message = session.CreateTextMessage("First");
+            producer.Send(message);
+            Thread.Sleep(1000);
+
+            // deleting the Queue will cause sends to fail
+            queue.Delete();
+            Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
+    
+            // This message delivery NOT should work since the temp connection is
+            // now closed.
+            try
+            {
+                message = session.CreateTextMessage("Hello");
+                producer.Send(message);
+                Assert.Fail("Send should fail since temp destination should not exist anymore.");
+            }
+            catch(NMSException e)
+            {
+                Tracer.Debug("Test threw expected exception: " + e.Message);
+                Assert.IsTrue(true, "failed to throw an exception");
+            }
+        }
+    
+        /// <summary>
+        /// Test you can't delete a Destination with Active Subscribers
+        /// </summary>
+        [Test]
+        public void TestDeleteDestinationWithSubscribersFails()
+        {
+            Connection connection = CreateConnection() as Connection;
+            connections.Add(connection);
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            ITemporaryQueue queue = session.CreateTemporaryQueue();
+
+            connection.Start();
+
+            session.CreateConsumer(queue);
+
+            try
+            {
+                queue.Delete();
+                Assert.Fail("Should fail as Subscribers are active");
+            }
+            catch(NMSException)
+            {
+                Assert.IsTrue(true, "failed to throw an exception");
+            }
+        }
+
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message