Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 269989B4D for ; Wed, 18 Apr 2012 22:33:02 +0000 (UTC) Received: (qmail 42210 invoked by uid 500); 18 Apr 2012 22:33:02 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 42179 invoked by uid 500); 18 Apr 2012 22:33:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42171 invoked by uid 99); 18 Apr 2012 22:33:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2012 22:33:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2012 22:32:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B084D2388860 for ; Wed, 18 Apr 2012 22:32:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1327710 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs Date: Wed, 18 Apr 2012 22:32:36 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120418223236.B084D2388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed Apr 18 22:32:36 2012 New Revision: 1327710 URL: http://svn.apache.org/viewvc?rev=1327710&view=rev Log: fix for: https://issues.apache.org/jira/browse/AMQNET-377 Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs?rev=1327710&r1=1327709&r2=1327710&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Connection.cs Wed Apr 18 22:32:36 2012 @@ -29,1270 +29,1270 @@ using Apache.NMS.Util; namespace Apache.NMS.ActiveMQ { - /// - /// Represents a connection with a message broker - /// - public class Connection : IConnection - { - private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); - - // Uri configurable options. - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; - private bool asyncSend = false; - private bool alwaysSyncSend = false; - private bool asyncClose = true; - private bool useCompression = false; - private bool copyMessageOnSend = true; - private bool sendAcksAsync = false; - private bool dispatchAsync = true; - private int producerWindowSize = 0; - private bool messagePrioritySupported = true; - private bool watchTopicAdviosires = true; - - private bool userSpecifiedClientID; - private readonly Uri brokerUri; - private ITransport transport; - private readonly ConnectionInfo info; - private TimeSpan requestTimeout; // from connection factory - private BrokerInfo brokerInfo; // from broker - private readonly CountDownLatch brokerInfoReceived = new CountDownLatch(1); - private WireFormatInfo brokerWireFormatInfo; // from broker - private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); - private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); - private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable()); - private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable()); - private readonly object myLock = new object(); - private readonly Atomic connected = new Atomic(false); - private readonly Atomic closed = new Atomic(false); - private readonly Atomic closing = new Atomic(false); - private readonly Atomic transportFailed = new Atomic(false); - private Exception firstFailureError = null; - private int sessionCounter = 0; - private int temporaryDestinationCounter = 0; - private int localTransactionCounter; - private readonly Atomic started = new Atomic(false); - private ConnectionMetaData metaData = null; - private bool disposed = false; - private IRedeliveryPolicy redeliveryPolicy; - private PrefetchPolicy prefetchPolicy = new PrefetchPolicy(); - private ICompressionPolicy compressionPolicy = new CompressionPolicy(); - private readonly IdGenerator clientIdGenerator; - private int consumerIdCounter = 0; - private volatile CountDownLatch transportInterruptionProcessingComplete; - private readonly MessageTransformation messageTransformation; - private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor(); - private AdvisoryConsumer advisoryConsumer = null; - - public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator) - { - this.brokerUri = connectionUri; - this.clientIdGenerator = clientIdGenerator; - - this.transport = transport; - this.transport.Command = new CommandHandler(OnCommand); - this.transport.Exception = new ExceptionHandler(OnTransportException); - this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted); - this.transport.Resumed = new ResumedHandler(OnTransportResumed); - - ConnectionId id = new ConnectionId(); - id.Value = CONNECTION_ID_GENERATOR.GenerateId(); - - this.info = new ConnectionInfo(); - this.info.ConnectionId = id; - this.info.FaultTolerant = transport.IsFaultTolerant; - - this.messageTransformation = new ActiveMQMessageTransformation(this); - } - - ~Connection() - { - Dispose(false); - } - - /// - /// A delegate that can receive transport level exceptions. - /// - public event ExceptionListener ExceptionListener; - - /// - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been interrupted. - /// - public event ConnectionInterruptedListener ConnectionInterruptedListener; - - /// - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been resumed. - /// - public event ConnectionResumedListener ConnectionResumedListener; - - private ConsumerTransformerDelegate consumerTransformer; - public ConsumerTransformerDelegate ConsumerTransformer - { - get { return this.consumerTransformer; } - set { this.consumerTransformer = value; } - } - - private ProducerTransformerDelegate producerTransformer; - public ProducerTransformerDelegate ProducerTransformer - { - get { return this.producerTransformer; } - set { this.producerTransformer = value; } - } - - #region Properties - - public String UserName - { - get { return this.info.UserName; } - set { this.info.UserName = value; } - } - - public String Password - { - get { return this.info.Password; } - set { this.info.Password = value; } - } - - /// - /// This property indicates what version of the Protocol we are using to - /// communicate with the Broker, if not set we return the lowest version - /// number to indicate we support only the basic command set. - /// - public int ProtocolVersion - { - get - { - if(brokerWireFormatInfo != null) - { - return brokerWireFormatInfo.Version; - } - - return 1; - } - } - - /// - /// This property indicates whether or not async send is enabled. - /// - public bool AsyncSend - { - get { return asyncSend; } - set { asyncSend = value; } - } - - /// - /// This property indicates whether or not async close is enabled. - /// When the connection is closed, it will either send a synchronous - /// DisposeOf command to the broker and wait for confirmation (if true), - /// or it will send the DisposeOf command asynchronously. - /// - public bool AsyncClose - { - get { return asyncClose; } - set { asyncClose = value; } - } - - /// - /// This property indicates whether or not async sends are used for - /// message acknowledgement messages. Sending Acks async can improve - /// performance but may decrease reliability. - /// - public bool SendAcksAsync - { - get { return sendAcksAsync; } - set { sendAcksAsync = value; } - } - - /// - /// This property sets the acknowledgment mode for the connection. - /// The URI parameter connection.ackmode can be set to a string value - /// that maps to the enumeration value. - /// - public string AckMode - { - set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); } - } - - /// - /// This property is the maximum number of bytes in memory that a producer will transmit - /// to a broker before waiting for acknowledgement messages from the broker that it has - /// accepted the previously sent messages. In other words, this how you configure the - /// producer flow control window that is used for async sends where the client is responsible - /// for managing memory usage. The default value of 0 means no flow control at the client - /// - public int ProducerWindowSize - { - get { return producerWindowSize; } - set { producerWindowSize = value; } - } - - /// - /// This property forces all messages that are sent to be sent synchronously overriding - /// any usage of the AsyncSend flag. This can reduce performance in some cases since the - /// only messages we normally send synchronously are Persistent messages not sent in a - /// transaction. This options guarantees that no send will return until the broker has - /// acknowledge receipt of the message - /// - public bool AlwaysSyncSend - { - get { return alwaysSyncSend; } - set { alwaysSyncSend = value; } - } - - /// - /// This property indicates whether Message's should be copied before being sent via - /// one of the Connection's send methods. Copying the Message object allows the user - /// to resuse the Object over for another send. If the message isn't copied performance - /// can improve but the user must not reuse the Object as it may not have been sent - /// before they reset its payload. - /// - public bool CopyMessageOnSend - { - get { return copyMessageOnSend; } - set { copyMessageOnSend = value; } - } - - /// - /// Enable or Disable the use of Compression on Message bodies. When enabled all - /// messages have their body compressed using the Deflate compression algorithm. - /// The recipient of the message must support the use of message compression as well - /// otherwise the receiving client will receive a message whose body appears in the - /// compressed form. - /// - public bool UseCompression - { - get { return this.useCompression; } - set { this.useCompression = value; } - } - - /// - /// Indicate whether or not the resources of this Connection should support the - /// Message Priority value of incoming messages and dispatch them accordingly. - /// When disabled Message are always dispatched to Consumers in FIFO order. - /// - public bool MessagePrioritySupported - { - get { return this.messagePrioritySupported; } - set { this.messagePrioritySupported = value; } - } - - public IConnectionMetaData MetaData - { - get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } - } - - public Uri BrokerUri - { - get { return brokerUri; } - } - - public ITransport ITransport - { - get { return transport; } - set { this.transport = value; } - } - - public bool TransportFailed - { - get { return this.transportFailed.Value; } - } - - public Exception FirstFailureError - { - get { return this.firstFailureError; } - } - - public TimeSpan RequestTimeout - { - get { return this.requestTimeout; } - set { this.requestTimeout = value; } - } - - public AcknowledgementMode AcknowledgementMode - { - get { return acknowledgementMode; } - set { this.acknowledgementMode = value; } - } - - /// - /// synchronously or asynchronously by the broker. - /// - public bool DispatchAsync - { - get { return this.dispatchAsync; } - set { this.dispatchAsync = value; } - } - - public bool WatchTopicAdvisories - { - get { return this.watchTopicAdviosires; } - set { this.watchTopicAdviosires = value; } - } - - public string ClientId - { - get { return info.ClientId; } - set - { - if(this.connected.Value) - { - throw new NMSException("You cannot change the ClientId once the Connection is connected"); - } - - this.info.ClientId = value; - this.userSpecifiedClientID = true; - CheckConnected(); - } - } - - /// - /// The Default Client Id used if the ClientId property is not set explicity. - /// - public string DefaultClientId - { - set - { - this.info.ClientId = value; - this.userSpecifiedClientID = true; - } - } - - public ConnectionId ConnectionId - { - get { return info.ConnectionId; } - } - - public BrokerInfo BrokerInfo - { - get { return brokerInfo; } - } - - public WireFormatInfo BrokerWireFormat - { - get { return brokerWireFormatInfo; } - } - - public String ResourceManagerId - { - get - { - this.brokerInfoReceived.await(); - return brokerInfo.BrokerId.Value; - } - } - - /// - /// Get/or set the redelivery policy for this connection. - /// - public IRedeliveryPolicy RedeliveryPolicy - { - get { return this.redeliveryPolicy; } - set { this.redeliveryPolicy = value; } - } - - public PrefetchPolicy PrefetchPolicy - { - get { return this.prefetchPolicy; } - set { this.prefetchPolicy = value; } - } - - public ICompressionPolicy CompressionPolicy - { - get { return this.compressionPolicy; } - set { this.compressionPolicy = value; } - } - - internal MessageTransformation MessageTransformation - { - get { return this.messageTransformation; } - } - - #endregion - - /// - /// Starts asynchronous message delivery of incoming messages for this connection. - /// Synchronous delivery is unaffected. - /// - public void Start() - { - CheckConnected(); - if(started.CompareAndSet(false, true)) - { - lock(sessions.SyncRoot) - { - foreach(Session session in sessions) - { - session.Start(); - } - } - } - } - - /// - /// This property determines if the asynchronous message delivery of incoming - /// messages has been started for this connection. - /// - public bool IsStarted - { - get { return started.Value; } - } - - /// - /// Temporarily stop asynchronous delivery of inbound messages for this connection. - /// The sending of outbound messages is unaffected. - /// - public void Stop() - { - if(started.CompareAndSet(true, false)) - { - lock(sessions.SyncRoot) - { - foreach(Session session in sessions) - { - session.Stop(); - } - } - } - } - - /// - /// Creates a new session to work on this connection - /// - public ISession CreateSession() - { - return CreateAtiveMQSession(acknowledgementMode); - } - - /// - /// Creates a new session to work on this connection - /// - public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode) - { - return CreateAtiveMQSession(sessionAcknowledgementMode); - } - - protected virtual Session CreateAtiveMQSession(AcknowledgementMode ackMode) - { - CheckConnected(); - return new Session(this, NextSessionId, ackMode); - } - - internal void AddSession(Session session) - { - if(!this.closing.Value) - { - sessions.Add(session); - } - } - - internal void RemoveSession(Session session) - { - if(!this.closing.Value) - { - sessions.Remove(session); - } - } - - internal void addDispatcher(ConsumerId id, IDispatcher dispatcher) - { - if(!this.closing.Value) - { - this.dispatchers.Add(id, dispatcher); - } - } - - internal void removeDispatcher(ConsumerId id) - { - if(!this.closing.Value) - { - this.dispatchers.Remove(id); - } - } - - internal void addProducer(ProducerId id, MessageProducer producer) - { - if(!this.closing.Value) - { - this.producers.Add(id, producer); - } - } - - internal void removeProducer(ProducerId id) - { - if(!this.closing.Value) - { - this.producers.Remove(id); - } - } - - public void Close() - { - if(!this.closed.Value && !transportFailed.Value) - { - this.Stop(); - } - - lock(myLock) - { - if(this.closed.Value) - { - return; - } - - try - { - Tracer.InfoFormat("Connection[{0}]: Closing Connection Now.", this.ConnectionId); - this.closing.Value = true; - - if(this.advisoryConsumer != null) - { - this.advisoryConsumer.Dispose(); - this.advisoryConsumer = null; - } - - lock(sessions.SyncRoot) - { - foreach(Session session in sessions) - { - session.Shutdown(); - } - } - sessions.Clear(); - - if(this.tempDests.Count > 0) - { - // Make a copy of the destinations to delete, because the act of deleting - // them will modify the collection. - ActiveMQTempDestination[] tempDestsToDelete = new ActiveMQTempDestination[this.tempDests.Count]; - - this.tempDests.Values.CopyTo(tempDestsToDelete, 0); - foreach(ActiveMQTempDestination dest in tempDestsToDelete) - { - dest.Delete(); - } - } - - // Connected is true only when we've successfully sent our ConnectionInfo - // to the broker, so if we haven't announced ourselves there's no need to - // inform the broker of a remove, and if the transport is failed, why bother. - if(connected.Value && !transportFailed.Value) - { - DisposeOf(ConnectionId); - ShutdownInfo shutdowninfo = new ShutdownInfo(); - transport.Oneway(shutdowninfo); - } - - executor.Shutdown(); - - Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ConnectionId); - transport.Dispose(); - } - catch(Exception ex) - { - Tracer.ErrorFormat("Connection[{0}]: Error during connection close: {1}", ConnectionId, ex); - } - finally - { - if(executor != null) - { - executor.Shutdown(); - } - - this.transport = null; - this.closed.Value = true; - this.connected.Value = false; - this.closing.Value = false; - } - } - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected void Dispose(bool disposing) - { - if(disposed) - { - return; - } - - if(disposing) - { - // Dispose managed code here. - } - - try - { - Close(); - } - catch - { - // Ignore network errors. - } - - disposed = true; - } - - // Implementation methods - - /// - /// Performs a synchronous request-response with the broker - /// - /// - public Response SyncRequest(Command command) - { - return SyncRequest(command, this.RequestTimeout); - } - - /// - /// Performs a synchronous request-response with the broker for requested timeout duration. - /// - /// - /// - /// - public Response SyncRequest(Command command, TimeSpan requestTimeout) - { - CheckConnected(); - - try - { - Response response = transport.Request(command, requestTimeout); - if(response is ExceptionResponse) - { - ExceptionResponse exceptionResponse = (ExceptionResponse)response; - BrokerError brokerError = exceptionResponse.Exception; - throw new BrokerException(brokerError); - } - return response; - } - catch(Exception ex) - { - throw NMSExceptionSupport.Create(ex); - } - } - - public void Oneway(Command command) - { - CheckConnected(); - - try - { - transport.Oneway(command); - } - catch(Exception ex) - { - throw NMSExceptionSupport.Create(ex); - } - } - - private void DisposeOf(DataStructure objectId) - { - try - { - RemoveInfo command = new RemoveInfo(); - command.ObjectId = objectId; - if(asyncClose) - { - Tracer.DebugFormat("Connection[{0}]: Asynchronously disposing of Connection.", this.ConnectionId); - if(connected.Value) - { - transport.Oneway(command); - if (Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Connection[{0}]: Oneway command sent to broker: {1}", - this.ConnectionId, command); - } - } - } - else - { - // Ensure that the object is disposed to avoid potential race-conditions - // of trying to re-create the same object in the broker faster than - // the broker can dispose of the object. Allow up to 5 seconds to process. - Tracer.DebugFormat("Connection[{0}]: Synchronously disposing of Connection.", this.ConnectionId); - SyncRequest(command, TimeSpan.FromSeconds(5)); - Tracer.DebugFormat("Connection[{0}]: Synchronously closed of Connection.", this.ConnectionId); - } - } - catch // (BrokerException) - { - // Ignore exceptions while shutting down. - } - } - - private object checkConnectedLock = new object(); - - /// - /// Check and ensure that the connection objcet is connected. If it is not - /// connected or is closed, a ConnectionClosedException is thrown. - /// - internal void CheckConnected() - { - if(closed.Value) - { - throw new ConnectionClosedException(); - } - - if(!connected.Value) - { - DateTime timeoutTime = DateTime.Now + this.RequestTimeout; - int waitCount = 1; - - while(true) - { - if(Monitor.TryEnter(checkConnectedLock)) - { - try - { - if(!connected.Value) - { - if(!this.userSpecifiedClientID) - { - this.info.ClientId = this.clientIdGenerator.GenerateId(); - } - - try - { - if(null != transport) - { - // Send the connection and see if an ack/nak is returned. - Response response = transport.Request(this.info, this.RequestTimeout); - if(!(response is ExceptionResponse)) - { - connected.Value = true; - if(this.watchTopicAdviosires) - { - ConsumerId id = new ConsumerId( - new SessionId(info.ConnectionId, -1), - Interlocked.Increment(ref this.consumerIdCounter)); - this.advisoryConsumer = new AdvisoryConsumer(this, id); - } - } - } - } - catch - { - } - } - } - finally - { - Monitor.Exit(checkConnectedLock); - } - } - - if(connected.Value || DateTime.Now > timeoutTime) - { - break; - } - - // Back off from being overly aggressive. Having too many threads - // aggressively trying to connect to a down broker pegs the CPU. - Thread.Sleep(5 * (waitCount++)); - } - - if(!connected.Value) - { - throw new ConnectionClosedException(); - } - } - } - - /// - /// Handle incoming commands - /// - /// An ITransport - /// A Command - protected void OnCommand(ITransport commandTransport, Command command) - { - if(command.IsMessageDispatch) - { - WaitForTransportInterruptionProcessingToComplete(); - DispatchMessage((MessageDispatch)command); - } - else if(command.IsKeepAliveInfo) - { - OnKeepAliveCommand(commandTransport, (KeepAliveInfo)command); - } - else if(command.IsWireFormatInfo) - { - this.brokerWireFormatInfo = (WireFormatInfo)command; - } - else if(command.IsBrokerInfo) - { - this.brokerInfo = (BrokerInfo)command; - this.brokerInfoReceived.countDown(); - } - else if(command.IsShutdownInfo) - { - // Only terminate the connection if the transport we use is not fault - // tolerant otherwise we let the transport deal with the broker closing - // our connection and deal with IOException if it is sent to use. - if(!closing.Value && !closed.Value && this.transport != null && !this.transport.IsFaultTolerant) - { - OnException(new NMSException("Broker closed this connection via Shutdown command.")); - } - } - else if(command.IsProducerAck) - { - ProducerAck ack = (ProducerAck)command as ProducerAck; - if(ack.ProducerId != null) - { - MessageProducer producer = producers[ack.ProducerId] as MessageProducer; - if(producer != null) - { - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Connection[{0}]: Received a new ProducerAck -> ", - this.ConnectionId, ack); - } - - producer.OnProducerAck(ack); - } - } - } - else if(command.IsConnectionError) - { - if(!closing.Value && !closed.Value) - { - ConnectionError connectionError = (ConnectionError)command; - BrokerError brokerError = connectionError.Exception; - string message = "Broker connection error."; - string cause = ""; - - if(null != brokerError) - { - message = brokerError.Message; - if(null != brokerError.Cause) - { - cause = brokerError.Cause.Message; - } - } - - Tracer.ErrorFormat("Connection[{0}]: ConnectionError: " + message + " : " + cause, this.ConnectionId); - OnException(new NMSConnectionException(message, cause)); - } - } - else - { - Tracer.ErrorFormat("Connection[{0}]: Unknown command: " + command, this.ConnectionId); - } - } - - protected void DispatchMessage(MessageDispatch dispatch) - { - lock(dispatchers.SyncRoot) - { - if(dispatchers.Contains(dispatch.ConsumerId)) - { - IDispatcher dispatcher = (IDispatcher)dispatchers[dispatch.ConsumerId]; - - // Can be null when a consumer has sent a MessagePull and there was - // no available message at the broker to dispatch or when signalled - // that the end of a Queue browse has been reached. - if(dispatch.Message != null) - { - dispatch.Message.ReadOnlyBody = true; - dispatch.Message.ReadOnlyProperties = true; - dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter; - } - - dispatcher.Dispatch(dispatch); - - return; - } - } - - Tracer.ErrorFormat("Connection[{0}]: No such consumer active: " + dispatch.ConsumerId, this.ConnectionId); - } - - protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info) - { - try - { - if (connected.Value) - { - info.ResponseRequired = false; - transport.Oneway(info); - } - } - catch(Exception ex) - { - if(!closing.Value && !closed.Value) - { - OnException(ex); - } - } - } - - internal void OnAsyncException(Exception error) - { - if(!this.closed.Value && !this.closing.Value) - { - if(this.ExceptionListener != null) - { - if(!(error is NMSException)) - { - error = NMSExceptionSupport.Create(error); - } - NMSException e = (NMSException)error; - - // Called in another thread so that processing can continue - // here, ensures no lock contention. - executor.QueueUserWorkItem(AsyncCallExceptionListener, e); - } - else - { - Tracer.DebugFormat("Connection[{0}]: Async exception with no exception listener: " + error, this.ConnectionId); - } - } - } - - private void AsyncCallExceptionListener(object error) - { - NMSException exception = error as NMSException; - this.ExceptionListener(exception); - } - - internal void OnTransportException(ITransport source, Exception cause) - { - this.OnException(cause); - } - - internal void OnException(Exception error) - { - // Will fire an exception listener callback if there's any set. - OnAsyncException(error); - - if(!this.closing.Value && !this.closed.Value) - { - // Perform the actual work in another thread to avoid lock contention - // and allow the caller to continue on in its error cleanup. - executor.QueueUserWorkItem(AsyncOnExceptionHandler, error); - } - } - - private void AsyncOnExceptionHandler(object error) - { - Exception cause = error as Exception; - - MarkTransportFailed(cause); - - try - { - this.transport.Dispose(); - } - catch(Exception ex) - { - Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Transport: " + ex, this.ConnectionId); - } - - this.brokerInfoReceived.countDown(); - - IList sessionsCopy = null; - lock(this.sessions.SyncRoot) - { - sessionsCopy = new ArrayList(this.sessions); - } - - // Use a copy so we don't concurrently modify the Sessions list if the - // client is closing at the same time. - foreach(Session session in sessionsCopy) - { - try - { - session.Shutdown(); - } - catch(Exception ex) - { - Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Sessions: " + ex, this.ConnectionId); - } - } - } - - private void MarkTransportFailed(Exception error) - { - this.transportFailed.Value = true; - if(this.firstFailureError == null) - { - this.firstFailureError = error; - } - } - - protected void OnTransportInterrupted(ITransport sender) - { - Tracer.Debug("Connection: Transport has been Interrupted."); - - // Ensure that if there's an advisory consumer we don't add it to the - // set of consumers that need interruption processing. - this.transportInterruptionProcessingComplete = - new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0)); - - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Connection[{0}]: Transport interrupted, dispatchers: " + dispatchers.Count, this.ConnectionId); - } - - SignalInterruptionProcessingNeeded(); - - foreach(Session session in this.sessions) - { - try - { - session.ClearMessagesInProgress(); - } - catch(Exception ex) - { - Tracer.WarnFormat("Connection[{0}]: Exception while clearing messages: " + ex.Message, this.ConnectionId); - Tracer.Warn(ex.StackTrace); - } - } - - if(this.ConnectionInterruptedListener != null && !this.closing.Value) - { - try - { - this.ConnectionInterruptedListener(); - } - catch - { - } - } - } - - protected void OnTransportResumed(ITransport sender) - { - Tracer.Debug("Transport has resumed normal operation."); - - if(this.ConnectionResumedListener != null && !this.closing.Value) - { - try - { - this.ConnectionResumedListener(); - } - catch - { - } - } - } - - internal void OnSessionException(Session sender, Exception exception) - { - if(ExceptionListener != null) - { - try - { - ExceptionListener(exception); - } - catch - { - sender.Close(); - } - } - } - - /// - /// Creates a new local transaction ID - /// - public LocalTransactionId CreateLocalTransactionId() - { - LocalTransactionId id = new LocalTransactionId(); - id.ConnectionId = ConnectionId; - id.Value = Interlocked.Increment(ref localTransactionCounter); - return id; - } - - protected SessionId NextSessionId - { - get { return new SessionId(this.info.ConnectionId, Interlocked.Increment(ref this.sessionCounter)); } - } - - public ActiveMQTempDestination CreateTemporaryDestination(bool topic) - { - ActiveMQTempDestination destination = null; - - if(topic) - { - destination = new ActiveMQTempTopic( - info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter)); - } - else - { - destination = new ActiveMQTempQueue( - info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter)); - } - - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = ConnectionId; - command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add - command.Destination = destination; - - this.SyncRequest(command); - - destination = this.AddTempDestination(destination); - destination.Connection = this; - - return destination; - } - - protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) - { - } - - public void DeleteTemporaryDestination(IDestination destination) - { - CheckClosedOrFailed(); - - ActiveMQTempDestination temp = destination as ActiveMQTempDestination; - - foreach(Session session in this.sessions) - { - if(session.IsInUse(temp)) - { - throw new NMSException("A consumer is consuming from the temporary destination"); - } - } - - this.tempDests.Remove(destination as ActiveMQTempDestination); - this.DeleteDestination(destination); - } - - public void DeleteDestination(IDestination destination) - { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = this.ConnectionId; - command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove - command.Destination = (ActiveMQDestination)destination; - - this.Oneway(command); - } - - private void WaitForTransportInterruptionProcessingToComplete() - { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if(cdl != null) - { - if(!closed.Value && cdl.Remaining > 0) - { - Tracer.WarnFormat("Connection[{0}]: Dispatch paused, waiting for outstanding dispatch interruption " + - "processing (" + cdl.Remaining + ") to complete..", this.ConnectionId); - cdl.await(TimeSpan.FromSeconds(10)); - } - - SignalInterruptionProcessingComplete(); - } - } - - internal void TransportInterruptionProcessingComplete() - { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if(cdl != null) - { - cdl.countDown(); - try - { - SignalInterruptionProcessingComplete(); - } - catch - { - } - } - } - - private void SignalInterruptionProcessingComplete() - { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if(cdl.Remaining == 0) - { - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Connection[{0}]: transportInterruptionProcessingComplete.", this.info.ConnectionId); - } - - this.transportInterruptionProcessingComplete = null; - - FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; - if(failoverTransport != null) - { - failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId); - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport + - ") of interruption completion.", this.ConnectionId); - } - } - } - } - - private void SignalInterruptionProcessingNeeded() - { - FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; - - if(failoverTransport != null) - { - failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId); - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport + - ") of pending interruption processing.", this.ConnectionId); - } - } - } - - internal ActiveMQTempDestination AddTempDestination(ActiveMQTempDestination dest) - { - ActiveMQTempDestination addedDest = dest; - - // .NET lacks a putIfAbsent operation for Maps. - lock(tempDests.SyncRoot) - { - if(!this.tempDests.Contains(dest)) - { - this.tempDests.Add(dest, dest); - } - else - { - addedDest = this.tempDests[dest] as ActiveMQTempDestination; - } - } - - return addedDest; - } - - internal void RemoveTempDestination(ActiveMQTempDestination dest) - { - this.tempDests.Remove(dest); - } - - internal bool IsTempDestinationActive(ActiveMQTempDestination dest) - { - if(this.advisoryConsumer == null) - { - return true; - } - - return this.tempDests.Contains(dest); - } - - protected void CheckClosedOrFailed() - { - CheckClosed(); - if(transportFailed.Value) - { - throw new ConnectionFailedException(firstFailureError.Message); - } - } - - protected void CheckClosed() - { - if(closed.Value) - { - throw new ConnectionClosedException(); - } - } - } + /// + /// Represents a connection with a message broker + /// + public class Connection : IConnection + { + private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); + + // Uri configurable options. + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; + private bool asyncSend = false; + private bool alwaysSyncSend = false; + private bool asyncClose = true; + private bool useCompression = false; + private bool copyMessageOnSend = true; + private bool sendAcksAsync = false; + private bool dispatchAsync = true; + private int producerWindowSize = 0; + private bool messagePrioritySupported = true; + private bool watchTopicAdviosires = true; + + private bool userSpecifiedClientID; + private readonly Uri brokerUri; + private ITransport transport; + private readonly ConnectionInfo info; + private TimeSpan requestTimeout; // from connection factory + private BrokerInfo brokerInfo; // from broker + private readonly CountDownLatch brokerInfoReceived = new CountDownLatch(1); + private WireFormatInfo brokerWireFormatInfo; // from broker + private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); + private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); + private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable()); + private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable()); + private readonly object myLock = new object(); + private readonly Atomic connected = new Atomic(false); + private readonly Atomic closed = new Atomic(false); + private readonly Atomic closing = new Atomic(false); + private readonly Atomic transportFailed = new Atomic(false); + private Exception firstFailureError = null; + private int sessionCounter = 0; + private int temporaryDestinationCounter = 0; + private int localTransactionCounter; + private readonly Atomic started = new Atomic(false); + private ConnectionMetaData metaData = null; + private bool disposed = false; + private IRedeliveryPolicy redeliveryPolicy; + private PrefetchPolicy prefetchPolicy = new PrefetchPolicy(); + private ICompressionPolicy compressionPolicy = new CompressionPolicy(); + private readonly IdGenerator clientIdGenerator; + private int consumerIdCounter = 0; + private volatile CountDownLatch transportInterruptionProcessingComplete; + private readonly MessageTransformation messageTransformation; + private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor(); + private AdvisoryConsumer advisoryConsumer = null; + + public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator) + { + this.brokerUri = connectionUri; + this.clientIdGenerator = clientIdGenerator; + + this.transport = transport; + this.transport.Command = new CommandHandler(OnCommand); + this.transport.Exception = new ExceptionHandler(OnTransportException); + this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted); + this.transport.Resumed = new ResumedHandler(OnTransportResumed); + + ConnectionId id = new ConnectionId(); + id.Value = CONNECTION_ID_GENERATOR.GenerateId(); + + this.info = new ConnectionInfo(); + this.info.ConnectionId = id; + this.info.FaultTolerant = transport.IsFaultTolerant; + + this.messageTransformation = new ActiveMQMessageTransformation(this); + } + + ~Connection() + { + Dispose(false); + } + + /// + /// A delegate that can receive transport level exceptions. + /// + public event ExceptionListener ExceptionListener; + + /// + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been interrupted. + /// + public event ConnectionInterruptedListener ConnectionInterruptedListener; + + /// + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been resumed. + /// + public event ConnectionResumedListener ConnectionResumedListener; + + private ConsumerTransformerDelegate consumerTransformer; + public ConsumerTransformerDelegate ConsumerTransformer + { + get { return this.consumerTransformer; } + set { this.consumerTransformer = value; } + } + + private ProducerTransformerDelegate producerTransformer; + public ProducerTransformerDelegate ProducerTransformer + { + get { return this.producerTransformer; } + set { this.producerTransformer = value; } + } + + #region Properties + + public String UserName + { + get { return this.info.UserName; } + set { this.info.UserName = value; } + } + + public String Password + { + get { return this.info.Password; } + set { this.info.Password = value; } + } + + /// + /// This property indicates what version of the Protocol we are using to + /// communicate with the Broker, if not set we return the lowest version + /// number to indicate we support only the basic command set. + /// + public int ProtocolVersion + { + get + { + if(brokerWireFormatInfo != null) + { + return brokerWireFormatInfo.Version; + } + + return 1; + } + } + + /// + /// This property indicates whether or not async send is enabled. + /// + public bool AsyncSend + { + get { return asyncSend; } + set { asyncSend = value; } + } + + /// + /// This property indicates whether or not async close is enabled. + /// When the connection is closed, it will either send a synchronous + /// DisposeOf command to the broker and wait for confirmation (if true), + /// or it will send the DisposeOf command asynchronously. + /// + public bool AsyncClose + { + get { return asyncClose; } + set { asyncClose = value; } + } + + /// + /// This property indicates whether or not async sends are used for + /// message acknowledgement messages. Sending Acks async can improve + /// performance but may decrease reliability. + /// + public bool SendAcksAsync + { + get { return sendAcksAsync; } + set { sendAcksAsync = value; } + } + + /// + /// This property sets the acknowledgment mode for the connection. + /// The URI parameter connection.ackmode can be set to a string value + /// that maps to the enumeration value. + /// + public string AckMode + { + set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); } + } + + /// + /// This property is the maximum number of bytes in memory that a producer will transmit + /// to a broker before waiting for acknowledgement messages from the broker that it has + /// accepted the previously sent messages. In other words, this how you configure the + /// producer flow control window that is used for async sends where the client is responsible + /// for managing memory usage. The default value of 0 means no flow control at the client + /// + public int ProducerWindowSize + { + get { return producerWindowSize; } + set { producerWindowSize = value; } + } + + /// + /// This property forces all messages that are sent to be sent synchronously overriding + /// any usage of the AsyncSend flag. This can reduce performance in some cases since the + /// only messages we normally send synchronously are Persistent messages not sent in a + /// transaction. This options guarantees that no send will return until the broker has + /// acknowledge receipt of the message + /// + public bool AlwaysSyncSend + { + get { return alwaysSyncSend; } + set { alwaysSyncSend = value; } + } + + /// + /// This property indicates whether Message's should be copied before being sent via + /// one of the Connection's send methods. Copying the Message object allows the user + /// to resuse the Object over for another send. If the message isn't copied performance + /// can improve but the user must not reuse the Object as it may not have been sent + /// before they reset its payload. + /// + public bool CopyMessageOnSend + { + get { return copyMessageOnSend; } + set { copyMessageOnSend = value; } + } + + /// + /// Enable or Disable the use of Compression on Message bodies. When enabled all + /// messages have their body compressed using the Deflate compression algorithm. + /// The recipient of the message must support the use of message compression as well + /// otherwise the receiving client will receive a message whose body appears in the + /// compressed form. + /// + public bool UseCompression + { + get { return this.useCompression; } + set { this.useCompression = value; } + } + + /// + /// Indicate whether or not the resources of this Connection should support the + /// Message Priority value of incoming messages and dispatch them accordingly. + /// When disabled Message are always dispatched to Consumers in FIFO order. + /// + public bool MessagePrioritySupported + { + get { return this.messagePrioritySupported; } + set { this.messagePrioritySupported = value; } + } + + public IConnectionMetaData MetaData + { + get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } + } + + public Uri BrokerUri + { + get { return brokerUri; } + } + + public ITransport ITransport + { + get { return transport; } + set { this.transport = value; } + } + + public bool TransportFailed + { + get { return this.transportFailed.Value; } + } + + public Exception FirstFailureError + { + get { return this.firstFailureError; } + } + + public TimeSpan RequestTimeout + { + get { return this.requestTimeout; } + set { this.requestTimeout = value; } + } + + public AcknowledgementMode AcknowledgementMode + { + get { return acknowledgementMode; } + set { this.acknowledgementMode = value; } + } + + /// + /// synchronously or asynchronously by the broker. + /// + public bool DispatchAsync + { + get { return this.dispatchAsync; } + set { this.dispatchAsync = value; } + } + + public bool WatchTopicAdvisories + { + get { return this.watchTopicAdviosires; } + set { this.watchTopicAdviosires = value; } + } + + public string ClientId + { + get { return info.ClientId; } + set + { + if(this.connected.Value) + { + throw new NMSException("You cannot change the ClientId once the Connection is connected"); + } + + this.info.ClientId = value; + this.userSpecifiedClientID = true; + CheckConnected(); + } + } + + /// + /// The Default Client Id used if the ClientId property is not set explicity. + /// + public string DefaultClientId + { + set + { + this.info.ClientId = value; + this.userSpecifiedClientID = true; + } + } + + public ConnectionId ConnectionId + { + get { return info.ConnectionId; } + } + + public BrokerInfo BrokerInfo + { + get { return brokerInfo; } + } + + public WireFormatInfo BrokerWireFormat + { + get { return brokerWireFormatInfo; } + } + + public String ResourceManagerId + { + get + { + this.brokerInfoReceived.await(); + return brokerInfo.BrokerId.Value; + } + } + + /// + /// Get/or set the redelivery policy for this connection. + /// + public IRedeliveryPolicy RedeliveryPolicy + { + get { return this.redeliveryPolicy; } + set { this.redeliveryPolicy = value; } + } + + public PrefetchPolicy PrefetchPolicy + { + get { return this.prefetchPolicy; } + set { this.prefetchPolicy = value; } + } + + public ICompressionPolicy CompressionPolicy + { + get { return this.compressionPolicy; } + set { this.compressionPolicy = value; } + } + + internal MessageTransformation MessageTransformation + { + get { return this.messageTransformation; } + } + + #endregion + + /// + /// Starts asynchronous message delivery of incoming messages for this connection. + /// Synchronous delivery is unaffected. + /// + public void Start() + { + CheckConnected(); + if(started.CompareAndSet(false, true)) + { + lock(sessions.SyncRoot) + { + foreach(Session session in sessions) + { + session.Start(); + } + } + } + } + + /// + /// This property determines if the asynchronous message delivery of incoming + /// messages has been started for this connection. + /// + public bool IsStarted + { + get { return started.Value; } + } + + /// + /// Temporarily stop asynchronous delivery of inbound messages for this connection. + /// The sending of outbound messages is unaffected. + /// + public void Stop() + { + if(started.CompareAndSet(true, false)) + { + lock(sessions.SyncRoot) + { + foreach(Session session in sessions) + { + session.Stop(); + } + } + } + } + + /// + /// Creates a new session to work on this connection + /// + public ISession CreateSession() + { + return CreateAtiveMQSession(acknowledgementMode); + } + + /// + /// Creates a new session to work on this connection + /// + public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode) + { + return CreateAtiveMQSession(sessionAcknowledgementMode); + } + + protected virtual Session CreateAtiveMQSession(AcknowledgementMode ackMode) + { + CheckConnected(); + return new Session(this, NextSessionId, ackMode); + } + + internal void AddSession(Session session) + { + if(!this.closing.Value) + { + sessions.Add(session); + } + } + + internal void RemoveSession(Session session) + { + if(!this.closing.Value) + { + sessions.Remove(session); + } + } + + internal void addDispatcher(ConsumerId id, IDispatcher dispatcher) + { + if(!this.closing.Value) + { + this.dispatchers.Add(id, dispatcher); + } + } + + internal void removeDispatcher(ConsumerId id) + { + if(!this.closing.Value) + { + this.dispatchers.Remove(id); + } + } + + internal void addProducer(ProducerId id, MessageProducer producer) + { + if(!this.closing.Value) + { + this.producers.Add(id, producer); + } + } + + internal void removeProducer(ProducerId id) + { + if(!this.closing.Value) + { + this.producers.Remove(id); + } + } + + public void Close() + { + if(!this.closed.Value && !transportFailed.Value) + { + this.Stop(); + } + + lock(myLock) + { + if(this.closed.Value) + { + return; + } + + try + { + Tracer.InfoFormat("Connection[{0}]: Closing Connection Now.", this.ConnectionId); + this.closing.Value = true; + + if(this.advisoryConsumer != null) + { + this.advisoryConsumer.Dispose(); + this.advisoryConsumer = null; + } + + lock(sessions.SyncRoot) + { + foreach(Session session in sessions) + { + session.Shutdown(); + } + } + sessions.Clear(); + + if(this.tempDests.Count > 0) + { + // Make a copy of the destinations to delete, because the act of deleting + // them will modify the collection. + ActiveMQTempDestination[] tempDestsToDelete = new ActiveMQTempDestination[this.tempDests.Count]; + + this.tempDests.Values.CopyTo(tempDestsToDelete, 0); + foreach(ActiveMQTempDestination dest in tempDestsToDelete) + { + dest.Delete(); + } + } + + // Connected is true only when we've successfully sent our ConnectionInfo + // to the broker, so if we haven't announced ourselves there's no need to + // inform the broker of a remove, and if the transport is failed, why bother. + if(connected.Value && !transportFailed.Value) + { + DisposeOf(ConnectionId); + ShutdownInfo shutdowninfo = new ShutdownInfo(); + transport.Oneway(shutdowninfo); + } + + executor.Shutdown(); + + Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ConnectionId); + transport.Dispose(); + } + catch(Exception ex) + { + Tracer.ErrorFormat("Connection[{0}]: Error during connection close: {1}", ConnectionId, ex); + } + finally + { + if(executor != null) + { + executor.Shutdown(); + } + + this.transport = null; + this.closed.Value = true; + this.connected.Value = false; + this.closing.Value = false; + } + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposing) + { + if(disposed) + { + return; + } + + if(disposing) + { + // Dispose managed code here. + } + + try + { + Close(); + } + catch + { + // Ignore network errors. + } + + disposed = true; + } + + // Implementation methods + + /// + /// Performs a synchronous request-response with the broker + /// + /// + public Response SyncRequest(Command command) + { + return SyncRequest(command, this.RequestTimeout); + } + + /// + /// Performs a synchronous request-response with the broker for requested timeout duration. + /// + /// + /// + /// + public Response SyncRequest(Command command, TimeSpan requestTimeout) + { + CheckConnected(); + + try + { + Response response = transport.Request(command, requestTimeout); + if(response is ExceptionResponse) + { + ExceptionResponse exceptionResponse = (ExceptionResponse)response; + BrokerError brokerError = exceptionResponse.Exception; + throw new BrokerException(brokerError); + } + return response; + } + catch(Exception ex) + { + throw NMSExceptionSupport.Create(ex); + } + } + + public void Oneway(Command command) + { + CheckConnected(); + + try + { + transport.Oneway(command); + } + catch(Exception ex) + { + throw NMSExceptionSupport.Create(ex); + } + } + + private void DisposeOf(DataStructure objectId) + { + try + { + RemoveInfo command = new RemoveInfo(); + command.ObjectId = objectId; + if(asyncClose) + { + Tracer.DebugFormat("Connection[{0}]: Asynchronously disposing of Connection.", this.ConnectionId); + if(connected.Value) + { + transport.Oneway(command); + if (Tracer.IsDebugEnabled) + { + Tracer.DebugFormat("Connection[{0}]: Oneway command sent to broker: {1}", + this.ConnectionId, command); + } + } + } + else + { + // Ensure that the object is disposed to avoid potential race-conditions + // of trying to re-create the same object in the broker faster than + // the broker can dispose of the object. Allow up to 5 seconds to process. + Tracer.DebugFormat("Connection[{0}]: Synchronously disposing of Connection.", this.ConnectionId); + SyncRequest(command, TimeSpan.FromSeconds(5)); + Tracer.DebugFormat("Connection[{0}]: Synchronously closed of Connection.", this.ConnectionId); + } + } + catch // (BrokerException) + { + // Ignore exceptions while shutting down. + } + } + + private object checkConnectedLock = new object(); + + /// + /// Check and ensure that the connection objcet is connected. If it is not + /// connected or is closed, a ConnectionClosedException is thrown. + /// + internal void CheckConnected() + { + if(closed.Value) + { + throw new ConnectionClosedException(); + } + + if(!connected.Value) + { + DateTime timeoutTime = DateTime.Now + this.RequestTimeout; + int waitCount = 1; + + while(true) + { + if(Monitor.TryEnter(checkConnectedLock)) + { + try + { + if(!connected.Value) + { + if(!this.userSpecifiedClientID) + { + this.info.ClientId = this.clientIdGenerator.GenerateId(); + } + + try + { + if(null != transport) + { + // Send the connection and see if an ack/nak is returned. + Response response = transport.Request(this.info, this.RequestTimeout); + if(!(response is ExceptionResponse)) + { + connected.Value = true; + if(this.watchTopicAdviosires) + { + ConsumerId id = new ConsumerId( + new SessionId(info.ConnectionId, -1), + Interlocked.Increment(ref this.consumerIdCounter)); + this.advisoryConsumer = new AdvisoryConsumer(this, id); + } + } + } + } + catch + { + } + } + } + finally + { + Monitor.Exit(checkConnectedLock); + } + } + + if(connected.Value || DateTime.Now > timeoutTime) + { + break; + } + + // Back off from being overly aggressive. Having too many threads + // aggressively trying to connect to a down broker pegs the CPU. + Thread.Sleep(5 * (waitCount++)); + } + + if(!connected.Value) + { + throw new ConnectionClosedException(); + } + } + } + + /// + /// Handle incoming commands + /// + /// An ITransport + /// A Command + protected void OnCommand(ITransport commandTransport, Command command) + { + if(command.IsMessageDispatch) + { + WaitForTransportInterruptionProcessingToComplete(); + DispatchMessage((MessageDispatch)command); + } + else if(command.IsKeepAliveInfo) + { + OnKeepAliveCommand(commandTransport, (KeepAliveInfo)command); + } + else if(command.IsWireFormatInfo) + { + this.brokerWireFormatInfo = (WireFormatInfo)command; + } + else if(command.IsBrokerInfo) + { + this.brokerInfo = (BrokerInfo)command; + this.brokerInfoReceived.countDown(); + } + else if(command.IsShutdownInfo) + { + // Only terminate the connection if the transport we use is not fault + // tolerant otherwise we let the transport deal with the broker closing + // our connection and deal with IOException if it is sent to use. + if(!closing.Value && !closed.Value && this.transport != null && !this.transport.IsFaultTolerant) + { + OnException(new NMSException("Broker closed this connection via Shutdown command.")); + } + } + else if(command.IsProducerAck) + { + ProducerAck ack = (ProducerAck)command as ProducerAck; + if(ack.ProducerId != null) + { + MessageProducer producer = producers[ack.ProducerId] as MessageProducer; + if(producer != null) + { + if(Tracer.IsDebugEnabled) + { + Tracer.DebugFormat("Connection[{0}]: Received a new ProducerAck -> ", + this.ConnectionId, ack); + } + + producer.OnProducerAck(ack); + } + } + } + else if(command.IsConnectionError) + { + if(!closing.Value && !closed.Value) + { + ConnectionError connectionError = (ConnectionError)command; + BrokerError brokerError = connectionError.Exception; + string message = "Broker connection error."; + string cause = ""; + + if(null != brokerError) + { + message = brokerError.Message; + if(null != brokerError.Cause) + { + cause = brokerError.Cause.Message; + } + } + + Tracer.ErrorFormat("Connection[{0}]: ConnectionError: " + message + " : " + cause, this.ConnectionId); + OnAsyncException(new NMSConnectionException(message, cause)); + } + } + else + { + Tracer.ErrorFormat("Connection[{0}]: Unknown command: " + command, this.ConnectionId); + } + } + + protected void DispatchMessage(MessageDispatch dispatch) + { + lock(dispatchers.SyncRoot) + { + if(dispatchers.Contains(dispatch.ConsumerId)) + { + IDispatcher dispatcher = (IDispatcher)dispatchers[dispatch.ConsumerId]; + + // Can be null when a consumer has sent a MessagePull and there was + // no available message at the broker to dispatch or when signalled + // that the end of a Queue browse has been reached. + if(dispatch.Message != null) + { + dispatch.Message.ReadOnlyBody = true; + dispatch.Message.ReadOnlyProperties = true; + dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter; + } + + dispatcher.Dispatch(dispatch); + + return; + } + } + + Tracer.ErrorFormat("Connection[{0}]: No such consumer active: " + dispatch.ConsumerId, this.ConnectionId); + } + + protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info) + { + try + { + if (connected.Value) + { + info.ResponseRequired = false; + transport.Oneway(info); + } + } + catch(Exception ex) + { + if(!closing.Value && !closed.Value) + { + OnException(ex); + } + } + } + + internal void OnAsyncException(Exception error) + { + if(!this.closed.Value && !this.closing.Value) + { + if(this.ExceptionListener != null) + { + if(!(error is NMSException)) + { + error = NMSExceptionSupport.Create(error); + } + NMSException e = (NMSException)error; + + // Called in another thread so that processing can continue + // here, ensures no lock contention. + executor.QueueUserWorkItem(AsyncCallExceptionListener, e); + } + else + { + Tracer.DebugFormat("Connection[{0}]: Async exception with no exception listener: " + error, this.ConnectionId); + } + } + } + + private void AsyncCallExceptionListener(object error) + { + NMSException exception = error as NMSException; + this.ExceptionListener(exception); + } + + internal void OnTransportException(ITransport source, Exception cause) + { + this.OnException(cause); + } + + internal void OnException(Exception error) + { + // Will fire an exception listener callback if there's any set. + OnAsyncException(error); + + if(!this.closing.Value && !this.closed.Value) + { + // Perform the actual work in another thread to avoid lock contention + // and allow the caller to continue on in its error cleanup. + executor.QueueUserWorkItem(AsyncOnExceptionHandler, error); + } + } + + private void AsyncOnExceptionHandler(object error) + { + Exception cause = error as Exception; + + MarkTransportFailed(cause); + + try + { + this.transport.Dispose(); + } + catch(Exception ex) + { + Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Transport: " + ex, this.ConnectionId); + } + + this.brokerInfoReceived.countDown(); + + IList sessionsCopy = null; + lock(this.sessions.SyncRoot) + { + sessionsCopy = new ArrayList(this.sessions); + } + + // Use a copy so we don't concurrently modify the Sessions list if the + // client is closing at the same time. + foreach(Session session in sessionsCopy) + { + try + { + session.Shutdown(); + } + catch(Exception ex) + { + Tracer.DebugFormat("Connection[{0}]: Caught Exception While disposing of Sessions: " + ex, this.ConnectionId); + } + } + } + + private void MarkTransportFailed(Exception error) + { + this.transportFailed.Value = true; + if(this.firstFailureError == null) + { + this.firstFailureError = error; + } + } + + protected void OnTransportInterrupted(ITransport sender) + { + Tracer.Debug("Connection: Transport has been Interrupted."); + + // Ensure that if there's an advisory consumer we don't add it to the + // set of consumers that need interruption processing. + this.transportInterruptionProcessingComplete = + new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0)); + + if(Tracer.IsDebugEnabled) + { + Tracer.DebugFormat("Connection[{0}]: Transport interrupted, dispatchers: " + dispatchers.Count, this.ConnectionId); + } + + SignalInterruptionProcessingNeeded(); + + foreach(Session session in this.sessions) + { + try + { + session.ClearMessagesInProgress(); + } + catch(Exception ex) + { + Tracer.WarnFormat("Connection[{0}]: Exception while clearing messages: " + ex.Message, this.ConnectionId); + Tracer.Warn(ex.StackTrace); + } + } + + if(this.ConnectionInterruptedListener != null && !this.closing.Value) + { + try + { + this.ConnectionInterruptedListener(); + } + catch + { + } + } + } + + protected void OnTransportResumed(ITransport sender) + { + Tracer.Debug("Transport has resumed normal operation."); + + if(this.ConnectionResumedListener != null && !this.closing.Value) + { + try + { + this.ConnectionResumedListener(); + } + catch + { + } + } + } + + internal void OnSessionException(Session sender, Exception exception) + { + if(ExceptionListener != null) + { + try + { + ExceptionListener(exception); + } + catch + { + sender.Close(); + } + } + } + + /// + /// Creates a new local transaction ID + /// + public LocalTransactionId CreateLocalTransactionId() + { + LocalTransactionId id = new LocalTransactionId(); + id.ConnectionId = ConnectionId; + id.Value = Interlocked.Increment(ref localTransactionCounter); + return id; + } + + protected SessionId NextSessionId + { + get { return new SessionId(this.info.ConnectionId, Interlocked.Increment(ref this.sessionCounter)); } + } + + public ActiveMQTempDestination CreateTemporaryDestination(bool topic) + { + ActiveMQTempDestination destination = null; + + if(topic) + { + destination = new ActiveMQTempTopic( + info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter)); + } + else + { + destination = new ActiveMQTempQueue( + info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter)); + } + + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = ConnectionId; + command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add + command.Destination = destination; + + this.SyncRequest(command); + + destination = this.AddTempDestination(destination); + destination.Connection = this; + + return destination; + } + + protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) + { + } + + public void DeleteTemporaryDestination(IDestination destination) + { + CheckClosedOrFailed(); + + ActiveMQTempDestination temp = destination as ActiveMQTempDestination; + + foreach(Session session in this.sessions) + { + if(session.IsInUse(temp)) + { + throw new NMSException("A consumer is consuming from the temporary destination"); + } + } + + this.tempDests.Remove(destination as ActiveMQTempDestination); + this.DeleteDestination(destination); + } + + public void DeleteDestination(IDestination destination) + { + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = this.ConnectionId; + command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove + command.Destination = (ActiveMQDestination)destination; + + this.Oneway(command); + } + + private void WaitForTransportInterruptionProcessingToComplete() + { + CountDownLatch cdl = this.transportInterruptionProcessingComplete; + if(cdl != null) + { + if(!closed.Value && cdl.Remaining > 0) + { + Tracer.WarnFormat("Connection[{0}]: Dispatch paused, waiting for outstanding dispatch interruption " + + "processing (" + cdl.Remaining + ") to complete..", this.ConnectionId); + cdl.await(TimeSpan.FromSeconds(10)); + } + + SignalInterruptionProcessingComplete(); + } + } + + internal void TransportInterruptionProcessingComplete() + { + CountDownLatch cdl = this.transportInterruptionProcessingComplete; + if(cdl != null) + { + cdl.countDown(); + try + { + SignalInterruptionProcessingComplete(); + } + catch + { + } + } + } + + private void SignalInterruptionProcessingComplete() + { + CountDownLatch cdl = this.transportInterruptionProcessingComplete; + if(cdl.Remaining == 0) + { + if(Tracer.IsDebugEnabled) + { + Tracer.DebugFormat("Connection[{0}]: transportInterruptionProcessingComplete.", this.info.ConnectionId); + } + + this.transportInterruptionProcessingComplete = null; + + FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; + if(failoverTransport != null) + { + failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId); + if(Tracer.IsDebugEnabled) + { + Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport + + ") of interruption completion.", this.ConnectionId); + } + } + } + } + + private void SignalInterruptionProcessingNeeded() + { + FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport; + + if(failoverTransport != null) + { + failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId); + if(Tracer.IsDebugEnabled) + { + Tracer.DebugFormat("Connection[{0}]: notified failover transport (" + failoverTransport + + ") of pending interruption processing.", this.ConnectionId); + } + } + } + + internal ActiveMQTempDestination AddTempDestination(ActiveMQTempDestination dest) + { + ActiveMQTempDestination addedDest = dest; + + // .NET lacks a putIfAbsent operation for Maps. + lock(tempDests.SyncRoot) + { + if(!this.tempDests.Contains(dest)) + { + this.tempDests.Add(dest, dest); + } + else + { + addedDest = this.tempDests[dest] as ActiveMQTempDestination; + } + } + + return addedDest; + } + + internal void RemoveTempDestination(ActiveMQTempDestination dest) + { + this.tempDests.Remove(dest); + } + + internal bool IsTempDestinationActive(ActiveMQTempDestination dest) + { + if(this.advisoryConsumer == null) + { + return true; + } + + return this.tempDests.Contains(dest); + } + + protected void CheckClosedOrFailed() + { + CheckClosed(); + if(transportFailed.Value) + { + throw new ConnectionFailedException(firstFailureError.Message); + } + } + + protected void CheckClosed() + { + if(closed.Value) + { + throw new ConnectionClosedException(); + } + } + } }