Author: jgomes
Date: Mon Aug 30 18:04:21 2010
New Revision: 990885
URL: http://svn.apache.org/viewvc?rev=990885&view=rev
Log:
Implement enhancements for EMS provider.
Fixes [AMQNET-271]. (See https://issues.apache.org/activemq/browse/AMQNET-271)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/ISession.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Mon Aug 30 18:04:21 2010
@@ -39,25 +39,25 @@ namespace Apache.NMS.ActiveMQ
private Uri brokerUri;
private string connectionUserName;
private string connectionPassword;
- private string clientId;
- private string clientIdPrefix;
- private IdGenerator clientIdGenerator;
-
- private bool useCompression;
- private bool copyMessageOnSend = true;
- private bool dispatchAsync = true;
- private bool asyncSend;
- private bool asyncClose;
- private bool alwaysSyncSend;
- private bool sendAcksAsync = true;
+ private string clientId;
+ private string clientIdPrefix;
+ private IdGenerator clientIdGenerator;
+
+ private bool useCompression;
+ private bool copyMessageOnSend = true;
+ private bool dispatchAsync = true;
+ private bool asyncSend;
+ private bool asyncClose;
+ private bool alwaysSyncSend;
+ private bool sendAcksAsync = true;
private int producerWindowSize = 0;
- private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
- private bool messagePrioritySupported=true;
+ private bool messagePrioritySupported=true;
- private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
- private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
- private ICompressionPolicy compressionPolicy = new CompressionPolicy();
+ private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+ private ICompressionPolicy compressionPolicy = new CompressionPolicy();
static ConnectionFactory()
{
@@ -106,57 +106,57 @@ namespace Apache.NMS.ActiveMQ
public IConnection CreateConnection(string userName, string password)
{
- Connection connection = null;
+ Connection connection = null;
- try
- {
- Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
-
- ITransport transport = TransportFactory.CreateTransport(brokerUri);
-
- connection = new Connection(brokerUri, transport, this.ClientIdGenerator);
-
- ConfigureConnection(connection);
-
- connection.UserName = userName;
- connection.Password = password;
-
- if(this.clientId != null)
- {
- connection.DefaultClientId = this.clientId;
- }
-
- connection.ITransport.Start();
-
- return connection;
- }
- catch(NMSException e)
- {
- try
- {
- connection.Close();
- }
- catch
- {
- }
-
- throw e;
- }
- catch(Exception e)
- {
- try
- {
- connection.Close();
- }
- catch
- {
- }
+ try
+ {
+ Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
+
+ ITransport transport = TransportFactory.CreateTransport(brokerUri);
+
+ connection = new Connection(brokerUri, transport, this.ClientIdGenerator);
+
+ ConfigureConnection(connection);
- throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
- }
+ connection.UserName = userName;
+ connection.Password = password;
+
+ if(this.clientId != null)
+ {
+ connection.DefaultClientId = this.clientId;
+ }
+
+ connection.ITransport.Start();
+
+ return connection;
+ }
+ catch(NMSException e)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+
+ throw e;
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+
+ throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
+ }
}
- #region ConnectionFactory Properties
+ #region ConnectionFactory Properties
/// <summary>
/// Get/or set the broker Uri.
@@ -165,30 +165,30 @@ namespace Apache.NMS.ActiveMQ
{
get { return brokerUri; }
set
- {
- brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:"));
+ {
+ brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:"));
- if(brokerUri.Query != null)
- {
- StringDictionary properties = URISupport.ParseQuery(brokerUri.Query);
-
- StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
- StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
-
- if(connection != null)
- {
- URISupport.SetProperties(this, connection, "connection.");
- }
-
- if(nms != null)
- {
- URISupport.SetProperties(this.PrefetchPolicy, nms, "nms.PrefetchPolicy.");
- URISupport.SetProperties(this.RedeliveryPolicy, nms, "nms.RedeliveryPolicy.");
- }
-
- brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
- }
- }
+ if(brokerUri.Query != null)
+ {
+ StringDictionary properties = URISupport.ParseQuery(brokerUri.Query);
+
+ StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
+ StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
+
+ if(connection != null)
+ {
+ URISupport.SetProperties(this, connection, "connection.");
+ }
+
+ if(nms != null)
+ {
+ URISupport.SetProperties(this.PrefetchPolicy, nms, "nms.PrefetchPolicy.");
+ URISupport.SetProperties(this.RedeliveryPolicy, nms, "nms.RedeliveryPolicy.");
+ }
+
+ brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
+ }
+ }
}
public string UserName
@@ -209,59 +209,59 @@ namespace Apache.NMS.ActiveMQ
set { clientId = value; }
}
- public string ClientIdPrefix
- {
- get { return clientIdPrefix; }
- set { clientIdPrefix = value; }
- }
-
- public bool UseCompression
- {
- get { return this.useCompression; }
- set { this.useCompression = value; }
- }
-
- public bool CopyMessageOnSend
- {
- get { return copyMessageOnSend; }
- set { copyMessageOnSend = value; }
- }
-
- public bool AlwaysSyncSend
- {
- get { return alwaysSyncSend; }
- set { alwaysSyncSend = value; }
- }
-
- public bool AsyncClose
- {
- get { return asyncClose; }
- set { asyncClose = value; }
- }
-
- public bool SendAcksAsync
- {
- get { return sendAcksAsync; }
- set { sendAcksAsync = value; }
- }
-
- public bool AsyncSend
- {
- get { return asyncSend; }
- set { asyncSend = value; }
- }
-
- public bool DispatchAsync
- {
- get { return this.dispatchAsync; }
- set { this.dispatchAsync = value; }
- }
-
- public bool MessagePrioritySupported
- {
- get { return this.messagePrioritySupported; }
- set { this.messagePrioritySupported = value; }
- }
+ public string ClientIdPrefix
+ {
+ get { return clientIdPrefix; }
+ set { clientIdPrefix = value; }
+ }
+
+ public bool UseCompression
+ {
+ get { return this.useCompression; }
+ set { this.useCompression = value; }
+ }
+
+ public bool CopyMessageOnSend
+ {
+ get { return copyMessageOnSend; }
+ set { copyMessageOnSend = value; }
+ }
+
+ public bool AlwaysSyncSend
+ {
+ get { return alwaysSyncSend; }
+ set { alwaysSyncSend = value; }
+ }
+
+ public bool AsyncClose
+ {
+ get { return asyncClose; }
+ set { asyncClose = value; }
+ }
+
+ public bool SendAcksAsync
+ {
+ get { return sendAcksAsync; }
+ set { sendAcksAsync = value; }
+ }
+
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
+ public bool DispatchAsync
+ {
+ get { return this.dispatchAsync; }
+ set { this.dispatchAsync = value; }
+ }
+
+ public bool MessagePrioritySupported
+ {
+ get { return this.messagePrioritySupported; }
+ set { this.messagePrioritySupported = value; }
+ }
public int RequestTimeout
{
@@ -269,76 +269,76 @@ namespace Apache.NMS.ActiveMQ
set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
}
- public string AckMode
- {
- set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
- }
-
- public AcknowledgementMode AcknowledgementMode
- {
- get { return acknowledgementMode; }
- set { this.acknowledgementMode = value; }
- }
-
- public int ProducerWindowSize
- {
- get { return producerWindowSize; }
- set { producerWindowSize = value; }
- }
-
- public PrefetchPolicy PrefetchPolicy
- {
- get { return this.prefetchPolicy; }
- set { this.prefetchPolicy = value; }
- }
-
- public IRedeliveryPolicy RedeliveryPolicy
- {
- get { return this.redeliveryPolicy; }
- set
- {
- if(value != null)
- {
- this.redeliveryPolicy = value;
- }
- }
- }
-
- public ICompressionPolicy CompressionPolicy
- {
- get { return this.compressionPolicy; }
- set
- {
- if(value != null)
- {
- this.compressionPolicy = value;
- }
- }
- }
-
- public IdGenerator ClientIdGenerator
- {
- set { this.clientIdGenerator = value; }
- get
- {
- lock(this)
- {
- if(this.clientIdGenerator == null)
- {
- if(this.clientIdPrefix != null)
- {
- this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
- }
- else
- {
- this.clientIdGenerator = new IdGenerator();
- }
- }
-
- return this.clientIdGenerator;
- }
- }
- }
+ public string AckMode
+ {
+ set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { this.acknowledgementMode = value; }
+ }
+
+ public int ProducerWindowSize
+ {
+ get { return producerWindowSize; }
+ set { producerWindowSize = value; }
+ }
+
+ public PrefetchPolicy PrefetchPolicy
+ {
+ get { return this.prefetchPolicy; }
+ set { this.prefetchPolicy = value; }
+ }
+
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set
+ {
+ if(value != null)
+ {
+ this.redeliveryPolicy = value;
+ }
+ }
+ }
+
+ public ICompressionPolicy CompressionPolicy
+ {
+ get { return this.compressionPolicy; }
+ set
+ {
+ if(value != null)
+ {
+ this.compressionPolicy = value;
+ }
+ }
+ }
+
+ public IdGenerator ClientIdGenerator
+ {
+ set { this.clientIdGenerator = value; }
+ get
+ {
+ lock(this)
+ {
+ if(this.clientIdGenerator == null)
+ {
+ if(this.clientIdPrefix != null)
+ {
+ this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
+ }
+ else
+ {
+ this.clientIdGenerator = new IdGenerator();
+ }
+ }
+
+ return this.clientIdGenerator;
+ }
+ }
+ }
public event ExceptionListener OnException
{
@@ -353,6 +353,13 @@ namespace Apache.NMS.ActiveMQ
}
private ConsumerTransformerDelegate consumerTransformer;
+ /// <summary>
+ /// A Delegate that is called each time a Message is dispatched to allow the client to do
+ /// any necessary transformations on the received message before it is delivered. The
+ /// ConnectionFactory sets the provided delegate instance on each Connection instance that
+ /// is created from this factory, each connection in turn passes the delegate along to each
+ /// Session it creates which then passes that along to the Consumers it creates.
+ /// </summary>
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
@@ -360,33 +367,40 @@ namespace Apache.NMS.ActiveMQ
}
private ProducerTransformerDelegate producerTransformer;
+ /// <summary>
+ /// A delegate that is called each time a Message is sent from this Producer which allows
+ /// the application to perform any needed transformations on the Message before it is sent.
+ /// The ConnectionFactory sets the provided delegate instance on each Connection instance that
+ /// is created from this factory, each connection in turn passes the delegate along to each
+ /// Session it creates which then passes that along to the Producers it creates.
+ /// </summary>
public ProducerTransformerDelegate ProducerTransformer
{
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
}
-
- #endregion
- protected virtual void ConfigureConnection(Connection connection)
- {
- connection.AsyncClose = this.AsyncClose;
- connection.AsyncSend = this.AsyncSend;
- connection.CopyMessageOnSend = this.CopyMessageOnSend;
- connection.AlwaysSyncSend = this.AlwaysSyncSend;
- connection.DispatchAsync = this.DispatchAsync;
- connection.SendAcksAsync = this.SendAcksAsync;
- connection.AcknowledgementMode = this.acknowledgementMode;
- connection.UseCompression = this.useCompression;
+ #endregion
+
+ protected virtual void ConfigureConnection(Connection connection)
+ {
+ connection.AsyncClose = this.AsyncClose;
+ connection.AsyncSend = this.AsyncSend;
+ connection.CopyMessageOnSend = this.CopyMessageOnSend;
+ connection.AlwaysSyncSend = this.AlwaysSyncSend;
+ connection.DispatchAsync = this.DispatchAsync;
+ connection.SendAcksAsync = this.SendAcksAsync;
+ connection.AcknowledgementMode = this.acknowledgementMode;
+ connection.UseCompression = this.useCompression;
connection.RequestTimeout = this.requestTimeout;
connection.ProducerWindowSize = this.producerWindowSize;
- connection.MessagePrioritySupported = this.messagePrioritySupported;
- connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
- connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
- connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
+ connection.MessagePrioritySupported = this.messagePrioritySupported;
+ connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+ connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+ connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
- }
+ }
protected static void ExceptionHandler(Exception ex)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Mon Aug 30 18:04:21 2010
@@ -40,7 +40,7 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public class MessageConsumer : IMessageConsumer, IDispatcher
{
- private readonly MessageTransformation messageTransformation;
+ private readonly MessageTransformation messageTransformation;
private readonly MessageDispatchChannel unconsumedMessages;
private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
private readonly ConsumerInfo info;
@@ -60,7 +60,7 @@ namespace Apache.NMS.ActiveMQ
private int dispatchedCount = 0;
private volatile bool synchronizationRegistered = false;
private bool clearDispatchList = false;
- private bool inProgressClearRequiredFlag;
+ private bool inProgressClearRequiredFlag;
private const int DEFAULT_REDELIVERY_DELAY = 0;
private const int DEFAULT_MAX_REDELIVERIES = 5;
@@ -70,46 +70,46 @@ namespace Apache.NMS.ActiveMQ
private IRedeliveryPolicy redeliveryPolicy;
// Constructor internal to prevent clients from creating an instance.
- internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
- String name, String selector, int prefetch, int maxPendingMessageCount,
- bool noLocal, bool browser, bool dispatchAsync )
- {
- if(destination == null)
- {
- throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
- }
-
+ internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
+ String name, String selector, int prefetch, int maxPendingMessageCount,
+ bool noLocal, bool browser, bool dispatchAsync )
+ {
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ }
+
this.session = session;
- this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
- this.messageTransformation = this.session.Connection.MessageTransformation;
+ this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+ this.messageTransformation = this.session.Connection.MessageTransformation;
- if(session.Connection.MessagePrioritySupported)
- {
- this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
- }
- else
- {
- this.unconsumedMessages = new FifoMessageDispatchChannel();
- }
-
- this.info = new ConsumerInfo();
- this.info.ConsumerId = id;
- this.info.Destination = destination;
- this.info.SubscriptionName = name;
- this.info.Selector = selector;
- this.info.PrefetchSize = prefetch;
- this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
- this.info.NoLocal = noLocal;
- this.info.Browser = browser;
- this.info.DispatchAsync = dispatchAsync;
- this.info.Retroactive = session.Retroactive;
- this.info.Exclusive = session.Exclusive;
- this.info.Priority = session.Priority;
-
- // If the destination contained a URI query, then use it to set public properties
- // on the ConsumerInfo
- if(destination.Options != null)
- {
+ if(session.Connection.MessagePrioritySupported)
+ {
+ this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
+ }
+ else
+ {
+ this.unconsumedMessages = new FifoMessageDispatchChannel();
+ }
+
+ this.info = new ConsumerInfo();
+ this.info.ConsumerId = id;
+ this.info.Destination = destination;
+ this.info.SubscriptionName = name;
+ this.info.Selector = selector;
+ this.info.PrefetchSize = prefetch;
+ this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
+ this.info.NoLocal = noLocal;
+ this.info.Browser = browser;
+ this.info.DispatchAsync = dispatchAsync;
+ this.info.Retroactive = session.Retroactive;
+ this.info.Exclusive = session.Exclusive;
+ this.info.Priority = session.Priority;
+
+ // If the destination contained a URI query, then use it to set public properties
+ // on the ConsumerInfo
+ if(destination.Options != null)
+ {
// Get options prefixed with "consumer.*"
StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
// Extract out custom extension options "consumer.nms.*"
@@ -117,7 +117,7 @@ namespace Apache.NMS.ActiveMQ
URISupport.SetProperties(this.info, options);
URISupport.SetProperties(this, customConsumerOptions, "nms.");
- }
+ }
}
~MessageConsumer()
@@ -137,10 +137,10 @@ namespace Apache.NMS.ActiveMQ
get { return this.info.ConsumerId; }
}
- public ConsumerInfo ConsumerInfo
- {
- get { return this.info; }
- }
+ public ConsumerInfo ConsumerInfo
+ {
+ get { return this.info; }
+ }
public int RedeliveryTimeout
{
@@ -158,11 +158,11 @@ namespace Apache.NMS.ActiveMQ
get { return this.redeliveryPolicy; }
set { this.redeliveryPolicy = value; }
}
-
- public long UnconsumedMessageCount
- {
- get { return this.unconsumedMessages.Count; }
- }
+
+ public long UnconsumedMessageCount
+ {
+ get { return this.unconsumedMessages.Count; }
+ }
// Custom Options
private bool ignoreExpiration = false;
@@ -171,17 +171,21 @@ namespace Apache.NMS.ActiveMQ
get { return ignoreExpiration; }
set { ignoreExpiration = value; }
}
-
+
#endregion
#region IMessageConsumer Members
- private ConsumerTransformerDelegate consumerTransformer;
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get { return this.consumerTransformer; }
- set { this.consumerTransformer = value; }
- }
+ private ConsumerTransformerDelegate consumerTransformer;
+ /// <summary>
+ /// A Delegate that is called each time a Message is dispatched to allow the client to do
+ /// any necessary transformations on the received message before it is delivered.
+ /// </summary>
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
public event MessageListener Listener
{
@@ -336,8 +340,8 @@ namespace Apache.NMS.ActiveMQ
{
if(!this.unconsumedMessages.Closed)
{
- Tracer.Debug("Closing down the Consumer");
-
+ Tracer.Debug("Closing down the Consumer");
+
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now.
if(!this.session.IsTransacted)
@@ -351,7 +355,7 @@ namespace Apache.NMS.ActiveMQ
if(!this.session.IsTransacted)
{
- lock(this.dispatchedMessages)
+ lock(this.dispatchedMessages)
{
dispatchedMessages.Clear();
}
@@ -360,15 +364,15 @@ namespace Apache.NMS.ActiveMQ
this.unconsumedMessages.Close();
this.session.RemoveConsumer(this.info.ConsumerId);
- RemoveInfo removeCommand = new RemoveInfo();
+ RemoveInfo removeCommand = new RemoveInfo();
removeCommand.ObjectId = this.info.ConsumerId;
removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
this.session.Connection.Oneway(removeCommand);
this.session = null;
- Tracer.Debug("Consumer instance Closed.");
- }
+ Tracer.Debug("Consumer instance Closed.");
+ }
}
#endregion
@@ -383,11 +387,11 @@ namespace Apache.NMS.ActiveMQ
messagePull.Timeout = timeout;
messagePull.ResponseRequired = false;
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Sending MessagePull: " + messagePull);
- }
-
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Sending MessagePull: " + messagePull);
+ }
+
session.Connection.Oneway(messagePull);
}
}
@@ -408,8 +412,8 @@ namespace Apache.NMS.ActiveMQ
}
}
}
-
- if(dispatch == null)
+
+ if(dispatch == null)
{
Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
return;
@@ -456,38 +460,38 @@ namespace Apache.NMS.ActiveMQ
this.unconsumedMessages.Stop();
}
- internal void InProgressClearRequired()
- {
- inProgressClearRequiredFlag = true;
- // deal with delivered messages async to avoid lock contention with in progress acks
- clearDispatchList = true;
- }
-
- internal void ClearMessagesInProgress()
- {
- if(inProgressClearRequiredFlag)
- {
- // Called from a thread in the ThreadPool, so we wait until we can
- // get a lock on the unconsumed list then we clear it.
- lock(this.unconsumedMessages)
- {
- if(inProgressClearRequiredFlag)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
- this.unconsumedMessages.Count + ") on transport interrupt");
- }
-
- this.unconsumedMessages.Clear();
-
- // allow dispatch on this connection to resume
- this.session.Connection.TransportInterruptionProcessingComplete();
- this.inProgressClearRequiredFlag = false;
- }
- }
- }
- }
+ internal void InProgressClearRequired()
+ {
+ inProgressClearRequiredFlag = true;
+ // deal with delivered messages async to avoid lock contention with in progress acks
+ clearDispatchList = true;
+ }
+
+ internal void ClearMessagesInProgress()
+ {
+ if(inProgressClearRequiredFlag)
+ {
+ // Called from a thread in the ThreadPool, so we wait until we can
+ // get a lock on the unconsumed list then we clear it.
+ lock(this.unconsumedMessages)
+ {
+ if(inProgressClearRequiredFlag)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+ this.unconsumedMessages.Count + ") on transport interrupt");
+ }
+
+ this.unconsumedMessages.Clear();
+
+ // allow dispatch on this connection to resume
+ this.session.Connection.TransportInterruptionProcessingComplete();
+ this.inProgressClearRequiredFlag = false;
+ }
+ }
+ }
+ }
public void DeliverAcks()
{
@@ -555,10 +559,10 @@ namespace Apache.NMS.ActiveMQ
{
// on resumption a pending delivered ack will be out of sync with
// re-deliveries.
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
- }
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
+ }
this.pendingAck = null;
}
}
@@ -596,13 +600,13 @@ namespace Apache.NMS.ActiveMQ
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
- // If aborted we stop the abort here and let normal processing resume.
- // This allows the session to shutdown normally and ack all messages
- // that have outstanding acks in this consumer.
- if( (Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
- {
- Thread.ResetAbort();
- }
+ // If aborted we stop the abort here and let normal processing resume.
+ // This allows the session to shutdown normally and ack all messages
+ // that have outstanding acks in this consumer.
+ if( (Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
+ {
+ Thread.ResetAbort();
+ }
}
}
else
@@ -653,9 +657,9 @@ namespace Apache.NMS.ActiveMQ
/// <summary>
/// Used to get an enqueued message from the unconsumedMessages list. The
/// amount of time this method blocks is based on the timeout value. if
- /// timeout == Timeout.Infinite then it blocks until a message is received.
- /// if timeout == 0 then it it tries to not block at all, it returns a
- /// message if it is available if timeout > 0 then it blocks up to timeout
+ /// timeout == Timeout.Infinite then it blocks until a message is received.
+ /// if timeout == 0 then it it tries to not block at all, it returns a
+ /// message if it is available if timeout > 0 then it blocks up to timeout
/// amount of time. Expired messages will consumed by this method.
/// </summary>
/// <param name="timeout">
@@ -800,14 +804,14 @@ namespace Apache.NMS.ActiveMQ
else if(IsClientAcknowledge || IsIndividualAcknowledge)
{
bool messageAckedByConsumer = false;
-
+
lock(this.dispatchedMessages)
{
messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
}
-
+
if(messageAckedByConsumer)
- {
+ {
AckLater(dispatch, AckType.DeliveredAck);
}
}
@@ -886,19 +890,19 @@ namespace Apache.NMS.ActiveMQ
// ack and hence important, send it now so it is not lost.
if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
{
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
- }
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+ }
- this.session.Connection.Oneway(oldPendingAck);
+ this.session.Connection.Oneway(oldPendingAck);
}
else
{
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
- }
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+ }
}
}
@@ -1081,7 +1085,7 @@ namespace Apache.NMS.ActiveMQ
message = this.messageTransformation.TransformMessage<ActiveMQMessage>(newMessage);
}
}
-
+
message.Connection = this.session.Connection;
if(IsClientAcknowledge)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Mon Aug 30 18:04:21 2010
@@ -23,185 +23,185 @@ using Apache.NMS.ActiveMQ.Util;
namespace Apache.NMS.ActiveMQ
{
- /// <summary>
- /// An object capable of sending messages to some destination
- /// </summary>
- public class MessageProducer : IMessageProducer
- {
- private Session session;
- private MemoryUsage usage = null;
- private bool closed = false;
- private object closedLock = new object();
- private readonly ProducerInfo info;
- private int producerSequenceId = 0;
-
- private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
- private TimeSpan requestTimeout;
- private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
- private MsgPriority msgPriority = NMSConstants.defaultPriority - 1;
- private bool disableMessageID = false;
- private bool disableMessageTimestamp = false;
- protected bool disposed = false;
-
- private MessageTransformation messageTransformation;
-
- public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout)
- {
- this.session = session;
- this.RequestTimeout = requestTimeout;
-
- this.info = new ProducerInfo();
- this.info.ProducerId = id;
- this.info.Destination = destination;
- this.info.WindowSize = session.Connection.ProducerWindowSize;
-
- this.messageTransformation = session.Connection.MessageTransformation;
-
- // If the destination contained a URI query, then use it to set public
- // properties on the ProducerInfo
- if(destination != null && destination.Options != null)
- {
- URISupport.SetProperties(this.info, destination.Options, "producer.");
- }
-
- // Version Three and higher will send us a ProducerAck, but only if we
- // have a set producer window size.
- if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0)
- {
+ /// <summary>
+ /// An object capable of sending messages to some destination
+ /// </summary>
+ public class MessageProducer : IMessageProducer
+ {
+ private Session session;
+ private MemoryUsage usage = null;
+ private bool closed = false;
+ private object closedLock = new object();
+ private readonly ProducerInfo info;
+ private int producerSequenceId = 0;
+
+ private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
+ private TimeSpan requestTimeout;
+ private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
+ private MsgPriority msgPriority = NMSConstants.defaultPriority - 1;
+ private bool disableMessageID = false;
+ private bool disableMessageTimestamp = false;
+ protected bool disposed = false;
+
+ private MessageTransformation messageTransformation;
+
+ public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout)
+ {
+ this.session = session;
+ this.RequestTimeout = requestTimeout;
+
+ this.info = new ProducerInfo();
+ this.info.ProducerId = id;
+ this.info.Destination = destination;
+ this.info.WindowSize = session.Connection.ProducerWindowSize;
+
+ this.messageTransformation = session.Connection.MessageTransformation;
+
+ // If the destination contained a URI query, then use it to set public
+ // properties on the ProducerInfo
+ if(destination != null && destination.Options != null)
+ {
+ URISupport.SetProperties(this.info, destination.Options, "producer.");
+ }
+
+ // Version Three and higher will send us a ProducerAck, but only if we
+ // have a set producer window size.
+ if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0)
+ {
Tracer.Debug("MessageProducer created with a Window Size of: " + this.info.WindowSize);
- this.usage = new MemoryUsage(this.info.WindowSize);
- }
- }
-
- ~MessageProducer()
- {
- Dispose(false);
- }
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if(disposed)
- {
- return;
- }
-
- if(disposing)
- {
- // Dispose managed code here.
- }
-
- try
- {
- Close();
- }
- catch
- {
- // Ignore network errors.
- }
-
- disposed = true;
- }
-
- public void Close()
- {
- lock(closedLock)
- {
- if(closed)
- {
- return;
- }
-
- DoClose();
- RemoveInfo removeInfo = new RemoveInfo();
- removeInfo.ObjectId = this.info.ProducerId;
- this.session.Connection.Oneway(removeInfo);
- this.session = null;
- }
- }
-
- internal void DoClose()
- {
- lock(closedLock)
- {
- if(closed)
- {
- return;
- }
-
- try
- {
- session.RemoveProducer(info.ProducerId);
- }
- catch(Exception ex)
- {
- Tracer.ErrorFormat("Error during producer close: {0}", ex);
- }
-
- if(this.usage != null)
- {
- this.usage.Stop();
- }
-
- closed = true;
- }
- }
-
- public void Send(IMessage message)
- {
- Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
- }
-
- public void Send(IDestination destination, IMessage message)
- {
- Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
- }
-
- public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
- }
-
- public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- Send(destination, message, deliveryMode, priority, timeToLive, true);
- }
-
- protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
- {
- if(null == destination)
- {
- // See if this producer was created without a destination.
- if(null == info.Destination)
- {
- throw new NotSupportedException();
- }
-
- // The producer was created with a destination, but an invalid destination
- // was specified.
- throw new Apache.NMS.InvalidDestinationException();
- }
-
- ActiveMQDestination dest = null;
-
- if(destination == this.info.Destination)
- {
- dest = destination as ActiveMQDestination;
- }
- else if(info.Destination == null)
- {
- dest = ActiveMQDestination.Transform(destination);
- }
- else
- {
- throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
- }
-
+ this.usage = new MemoryUsage(this.info.WindowSize);
+ }
+ }
+
+ ~MessageProducer()
+ {
+ Dispose(false);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ // Dispose managed code here.
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ disposed = true;
+ }
+
+ public void Close()
+ {
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ return;
+ }
+
+ DoClose();
+ RemoveInfo removeInfo = new RemoveInfo();
+ removeInfo.ObjectId = this.info.ProducerId;
+ this.session.Connection.Oneway(removeInfo);
+ this.session = null;
+ }
+ }
+
+ internal void DoClose()
+ {
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ return;
+ }
+
+ try
+ {
+ session.RemoveProducer(info.ProducerId);
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during producer close: {0}", ex);
+ }
+
+ if(this.usage != null)
+ {
+ this.usage.Stop();
+ }
+
+ closed = true;
+ }
+ }
+
+ public void Send(IMessage message)
+ {
+ Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+ }
+
+ public void Send(IDestination destination, IMessage message)
+ {
+ Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+ }
+
+ public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+ }
+
+ public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ Send(destination, message, deliveryMode, priority, timeToLive, true);
+ }
+
+ protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+ {
+ if(null == destination)
+ {
+ // See if this producer was created without a destination.
+ if(null == info.Destination)
+ {
+ throw new NotSupportedException();
+ }
+
+ // The producer was created with a destination, but an invalid destination
+ // was specified.
+ throw new Apache.NMS.InvalidDestinationException();
+ }
+
+ ActiveMQDestination dest = null;
+
+ if(destination == this.info.Destination)
+ {
+ dest = destination as ActiveMQDestination;
+ }
+ else if(info.Destination == null)
+ {
+ dest = ActiveMQDestination.Transform(destination);
+ }
+ else
+ {
+ throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
+ }
+
if(this.ProducerTransformer != null)
{
IMessage transformed = this.ProducerTransformer(this.session, this, message);
@@ -211,147 +211,147 @@ namespace Apache.NMS.ActiveMQ
}
}
- ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message);
+ ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message);
- activeMessage.ProducerId = info.ProducerId;
- activeMessage.Destination = dest;
- activeMessage.NMSDeliveryMode = deliveryMode;
- activeMessage.NMSPriority = priority;
-
- // Always set the message Id regardless of the disable flag.
- MessageId id = new MessageId();
- id.ProducerId = info.ProducerId;
- id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
- activeMessage.MessageId = id;
-
- if(!disableMessageTimestamp)
- {
- activeMessage.NMSTimestamp = DateTime.UtcNow;
- }
-
- if(specifiedTimeToLive)
- {
- activeMessage.NMSTimeToLive = timeToLive;
- }
-
- // Ensure there's room left to send this message
- if(this.usage != null)
- {
- usage.WaitForSpace();
- }
-
- lock(closedLock)
- {
- if(closed)
- {
- throw new ConnectionClosedException();
- }
-
- session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
- }
- }
-
- public ProducerId ProducerId
- {
- get { return info.ProducerId; }
- }
-
- public ProducerInfo ProducerInfo
- {
- get { return info; }
- }
-
- public MsgDeliveryMode DeliveryMode
- {
- get { return msgDeliveryMode; }
- set { this.msgDeliveryMode = value; }
- }
-
- public TimeSpan TimeToLive
- {
- get { return msgTimeToLive; }
- set { this.msgTimeToLive = value; }
- }
-
- public TimeSpan RequestTimeout
- {
- get { return requestTimeout; }
- set { this.requestTimeout = value; }
- }
-
- public MsgPriority Priority
- {
- get { return msgPriority; }
- set { this.msgPriority = value; }
- }
-
- public bool DisableMessageID
- {
- get { return disableMessageID; }
- set { this.disableMessageID = value; }
- }
-
- public bool DisableMessageTimestamp
- {
- get { return disableMessageTimestamp; }
- set { this.disableMessageTimestamp = value; }
- }
-
- private ProducerTransformerDelegate producerTransformer;
- public ProducerTransformerDelegate ProducerTransformer
- {
- get { return this.producerTransformer; }
- set { this.producerTransformer = value; }
- }
-
- public IMessage CreateMessage()
- {
- return session.CreateMessage();
- }
-
- public ITextMessage CreateTextMessage()
- {
- return session.CreateTextMessage();
- }
-
- public ITextMessage CreateTextMessage(string text)
- {
- return session.CreateTextMessage(text);
- }
-
- public IMapMessage CreateMapMessage()
- {
- return session.CreateMapMessage();
- }
-
- public IObjectMessage CreateObjectMessage(object body)
- {
- return session.CreateObjectMessage(body);
- }
-
- public IBytesMessage CreateBytesMessage()
- {
- return session.CreateBytesMessage();
- }
-
- public IBytesMessage CreateBytesMessage(byte[] body)
- {
- return session.CreateBytesMessage(body);
- }
-
- public IStreamMessage CreateStreamMessage()
- {
- return session.CreateStreamMessage();
- }
-
- internal void OnProducerAck(ProducerAck ack)
- {
- Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}" );
-
- if(this.usage != null)
- {
- this.usage.DecreaseUsage( ack.Size );
- }
- }
- }
+ activeMessage.ProducerId = info.ProducerId;
+ activeMessage.Destination = dest;
+ activeMessage.NMSDeliveryMode = deliveryMode;
+ activeMessage.NMSPriority = priority;
+
+ // Always set the message Id regardless of the disable flag.
+ MessageId id = new MessageId();
+ id.ProducerId = info.ProducerId;
+ id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
+ activeMessage.MessageId = id;
+
+ if(!disableMessageTimestamp)
+ {
+ activeMessage.NMSTimestamp = DateTime.UtcNow;
+ }
+
+ if(specifiedTimeToLive)
+ {
+ activeMessage.NMSTimeToLive = timeToLive;
+ }
+
+ // Ensure there's room left to send this message
+ if(this.usage != null)
+ {
+ usage.WaitForSpace();
+ }
+
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
+ }
+ }
+
+ public ProducerId ProducerId
+ {
+ get { return info.ProducerId; }
+ }
+
+ public ProducerInfo ProducerInfo
+ {
+ get { return info; }
+ }
+
+ public MsgDeliveryMode DeliveryMode
+ {
+ get { return msgDeliveryMode; }
+ set { this.msgDeliveryMode = value; }
+ }
+
+ public TimeSpan TimeToLive
+ {
+ get { return msgTimeToLive; }
+ set { this.msgTimeToLive = value; }
+ }
+
+ public TimeSpan RequestTimeout
+ {
+ get { return requestTimeout; }
+ set { this.requestTimeout = value; }
+ }
+
+ public MsgPriority Priority
+ {
+ get { return msgPriority; }
+ set { this.msgPriority = value; }
+ }
+
+ public bool DisableMessageID
+ {
+ get { return disableMessageID; }
+ set { this.disableMessageID = value; }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get { return disableMessageTimestamp; }
+ set { this.disableMessageTimestamp = value; }
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ public IMessage CreateMessage()
+ {
+ return session.CreateMessage();
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ return session.CreateTextMessage();
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ return session.CreateTextMessage(text);
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return session.CreateMapMessage();
+ }
+
+ public IObjectMessage CreateObjectMessage(object body)
+ {
+ return session.CreateObjectMessage(body);
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return session.CreateBytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ return session.CreateBytesMessage(body);
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ return session.CreateStreamMessage();
+ }
+
+ internal void OnProducerAck(ProducerAck ack)
+ {
+ Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}" );
+
+ if(this.usage != null)
+ {
+ this.usage.DecreaseUsage( ack.Size );
+ }
+ }
+ }
}
|