qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stes...@apache.org
Subject svn commit: r481035 - in /incubator/qpid/trunk/qpid/dotnet: ./ Qpid.Client.Tests/HeadersExchange/ Qpid.Client.Tests/MultiConsumer/ Qpid.Client.Tests/failover/ Qpid.Client.Tests/requestreply1/ Qpid.Client.Tests/undeliverable/ Qpid.Client/Client/ Qpid.Co...
Date Thu, 30 Nov 2006 18:54:50 GMT
Author: steshaw
Date: Thu Nov 30 10:54:48 2006
New Revision: 481035

URL: http://svn.apache.org/viewvc?view=rev&rev=481035
Log:
QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow
QPID-137 Ported AcknowledgeModes

Modified:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs
    incubator/qpid/trunk/qpid/dotnet/TODO.txt

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
Thu Nov 30 10:54:48 2006
@@ -58,8 +58,9 @@
             _channel.Bind(queueName, _serviceName, null, CreatePatternAsFieldTable());
 
             IMessageConsumer consumer = _channel.CreateConsumerBuilder(queueName)
-                .withPrefetch(100)
-                .withNoLocal(true)
+                .WithPrefetchLow(100)
+                .WithPrefetchHigh(500)
+                .WithNoLocal(true)
                 .Create();
 
             consumer.OnMessage = new MessageReceivedDelegate(OnMessage);

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
Thu Nov 30 10:54:48 2006
@@ -43,8 +43,8 @@
             try
             {
                 _publisher = _channel.CreatePublisherBuilder()
-                    .withExchangeName(_commandExchangeName)
-                    .withMandatory(true)
+                    .WithExchangeName(_commandExchangeName)
+                    .WithMandatory(true)
                     .Create();
 
                 // Disabling timestamps - a performance optimisation where timestamps and
TTL/expiration 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
Thu Nov 30 10:54:48 2006
@@ -70,8 +70,8 @@
         {
             base.Init();
             _publisher = _channel.CreatePublisherBuilder()
-                .withRoutingKey(_commandQueueName)
-                .withExchangeName(ExchangeNameDefaults.TOPIC)
+                .WithRoutingKey(_commandQueueName)
+                .WithExchangeName(ExchangeNameDefaults.TOPIC)
                 .Create();
 
             _publisher.DisableMessageTimestamp = true;
@@ -85,7 +85,7 @@
                 _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
 
                 _consumers[i] = _channel.CreateConsumerBuilder(queueName)
-                    .withPrefetch(100).Create();
+                    .WithPrefetchLow(100).Create();
                 _consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage);
             }
             _connection.Start();

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs Thu Nov 30
10:54:48 2006
@@ -78,8 +78,8 @@
 
 //            _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
             _publisher = _channel.CreatePublisherBuilder()
-                .withRoutingKey(routingKey)
-                .withExchangeName(exchangeName)
+                .WithRoutingKey(routingKey)
+                .WithExchangeName(exchangeName)
                 .Create();
             _publisher.Send(msg);
         }
@@ -206,8 +206,8 @@
 
                 //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
                 return _session.CreatePublisherBuilder()
-                    .withExchangeName(exchangeName)
-                    .withRoutingKey(routingKey)
+                    .WithExchangeName(exchangeName)
+                    .WithRoutingKey(routingKey)
                     .Create();
             }
         }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs Thu Nov
30 10:54:48 2006
@@ -36,8 +36,11 @@
         const int NUM_ITERATIONS = 10;
         const int NUM_COMMITED_MESSAGES = 10;
         const int NUM_ROLLEDBACK_MESSAGES = 3;
-        const int SLEEP_MILLIS = 500;
+        const int SLEEP_MILLIS = 50;
 
+        // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge
+        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge;
+        const bool _noWait = true; // use Receive or ReceiveNoWait
         AMQConnection _connection;
 
         public void OnMessage(IMessage message)
@@ -45,6 +48,11 @@
             try
             {
                 _log.Info("Received: " + ((ITextMessage) message).Text);
+                if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+                {
+                    _log.Info("client acknowledging");
+                    message.Acknowledge();
+                }
             }
             catch (QpidException e)
             {
@@ -56,11 +64,13 @@
         {
             FailoverTxTest _failoverTxTest;
             IMessageConsumer _consumer;
+            private bool _noWait;
 
-            internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel)
+            internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel,
bool noWait)
             {
                 _failoverTxTest = failoverTxTest;
                 _consumer = channel;
+                _noWait = noWait;
             }
 
             internal void Run()
@@ -68,7 +78,9 @@
                 int messages = 0;
                 while (messages < NUM_COMMITED_MESSAGES)
                 {
-                    IMessage msg = _consumer.ReceiveNoWait();
+                    IMessage msg;
+                    if (_noWait) msg = _consumer.ReceiveNoWait();
+                    else msg = _consumer.Receive();
                     if (msg != null)
                     {
                         _log.Info("NoWait received message");
@@ -93,7 +105,8 @@
             _log.Info("connectionInfo = " + connectionInfo);
             _log.Info("connection.asUrl = " + _connection.toURL());
 
-            IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+            _log.Info("AcknowledgeMode is " + _acknowledgeMode);
+            IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
 
             string queueName = receivingChannel.GenerateUniqueName();
 
@@ -103,17 +116,17 @@
             // No need to call Queue.Bind as automatically bound to default direct exchange.
             receivingChannel.Bind(queueName, "amq.direct", queueName);
 
-
-            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create();
-            bool useThread = true;
+            IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+                .WithPrefetchLow(30)
+                .WithPrefetchHigh(60).Create();
+            bool useThread = false;
             if (useThread)
             {
-                NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer);
+                NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait);
                 new Thread(noWaitConsumer.Run).Start();
             }
             else
             {
-                //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage =
new MessageReceivedDelegate(onMessage);
                 consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
             }
 
@@ -133,7 +146,7 @@
             bool transacted = true;
             IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
             IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder()
-                .withRoutingKey(routingKey)
+                .WithRoutingKey(routingKey)
                 .Create();
 
             for (int i = 1; i <= NUM_ITERATIONS; ++i)

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
Thu Nov 30 10:54:48 2006
@@ -60,8 +60,9 @@
             _channel.DeclareQueue(_serviceName, false, false, false);
 
             IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName)
-                .withPrefetch(100)
-                .withNoLocal(true)
+                .WithPrefetchLow(100)
+                .WithPrefetchHigh(500)
+                .WithNoLocal(true)
                 .Create();
             consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
         }
@@ -100,8 +101,8 @@
 //                Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey);
 
                 _destinationPublisher = _channel.CreatePublisherBuilder()
-                    .withExchangeName(_replyToExchangeName)
-                    .withRoutingKey(_replyToRoutingKey)
+                    .WithExchangeName(_replyToExchangeName)
+                    .WithRoutingKey(_replyToRoutingKey)
                     .Create();
                 _destinationPublisher.DisableMessageTimestamp = true;
                 _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent;

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
Thu Nov 30 10:54:48 2006
@@ -53,7 +53,7 @@
             try
             {
                 _publisher = _channel.CreatePublisherBuilder()
-                    .withRoutingKey(_commandQueueName)
+                    .WithRoutingKey(_commandQueueName)
                     .Create();
                 _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this
in builder?
                 _publisher.DeliveryMode = DeliveryMode.NonPersistent;  // XXX: need a "with"
for this in builder?
@@ -74,9 +74,10 @@
             _channel.DeclareQueue(replyQueueName, false, true, true);
 
             IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName)
-                .withPrefetch(100)
-                .withNoLocal(true)
-                .withExclusive(true).Create();
+                .WithPrefetchLow(100)
+                .WithPrefetchHigh(200)
+                .WithNoLocal(true)
+                .WithExclusive(true).Create();
  
             _startTime = DateTime.Now.Ticks;
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
Thu Nov 30 10:54:48 2006
@@ -81,11 +81,11 @@
 
             // Send a test message to a non-existant queue on the default exchange. See if
message is returned!
             MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
-                .withRoutingKey("Non-existant route key!")
-                .withMandatory(true);
+                .WithRoutingKey("Non-existant route key!")
+                .WithMandatory(true);
             if (exchangeName != null)
             {
-                builder.withExchangeName(exchangeName);
+                builder.WithExchangeName(exchangeName);
             }
             IMessagePublisher publisher = builder.Create();
             publisher.Send(_channel.CreateTextMessage("Hiya!"));

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Thu Nov 30 10:54:48
2006
@@ -43,7 +43,7 @@
         // Used in the consume method. We generate the consume tag on the client so that
we can use the nowait feature.
         private int _nextConsumerNumber = 1;
 
-        internal const int DEFAULT_PREFETCH = 5000;
+        internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
 
         private AMQConnection _connection;
 
@@ -273,6 +273,7 @@
 
         public void Commit()
         {
+            // FIXME: Fail over safety. Needs FailoverSupport?
             CheckNotClosed();
             CheckTransacted(); // throws IllegalOperationException if not a transacted session
 
@@ -297,6 +298,7 @@
 
         public void Rollback()
         {
+            // FIXME: Fail over safety. Needs FailoverSupport?
             CheckNotClosed();
             CheckTransacted(); // throws IllegalOperationException if not a transacted session
 
@@ -489,25 +491,26 @@
         }
 
         public IMessageConsumer CreateConsumer(string queueName,
-                                               int prefetch,
+                                               int prefetchLow,
+                                               int prefetchHigh,
                                                bool noLocal,
                                                bool exclusive,
                                                bool durable,
                                                string subscriptionName)
         {
-            _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2}
exclusive={3} durable={4} subscriptionName={5}",
-                                  queueName, prefetch, noLocal, exclusive, durable, subscriptionName));
-            return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName);
+            _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2}
noLocal={3} exclusive={4} durable={5} subscriptionName={6}",
+                                  queueName, prefetchLow, prefetchHigh, noLocal, exclusive,
durable, subscriptionName));
+            return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive,
durable, subscriptionName);
         }
 
         private IMessageConsumer CreateConsumerImpl(string queueName,
-                                                      int prefetch,
-                                                      bool noLocal,
-                                                      bool exclusive,
-                                                      bool durable,
-                                                      string subscriptionName)
+                                                    int prefetchLow,
+                                                    int prefetchHigh,
+                                                    bool noLocal,
+                                                    bool exclusive,
+                                                    bool durable,
+                                                    string subscriptionName)
         {
-
             if (durable || subscriptionName != null)
             {
                 throw new NotImplementedException(); // TODO: durable subscriptions.
@@ -518,7 +521,8 @@
                 CheckNotClosed();
                
                 BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName,
noLocal,
-                                                                         _messageFactoryRegistry,
this);
+                                                                         _messageFactoryRegistry,
this,
+                                                                         prefetchHigh, prefetchLow,
exclusive);
                 try
                 {
                     RegisterConsumer(consumer);
@@ -710,9 +714,8 @@
         /// <param name="consumer"></param>
         void RegisterConsumer(BasicMessageConsumer consumer)
         {
-            String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch,
consumer.NoLocal,
+            String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
                                            consumer.Exclusive, consumer.AcknowledgeMode);
-
             consumer.ConsumerTag = consumerTag;
             _consumers.Add(consumerTag, consumer);
         }
@@ -744,8 +747,7 @@
             }
         }
 
-        private String ConsumeFromQueue(String queueName, int prefetch,
-                                    bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+        private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode
acknowledgeMode)
         {
             // Need to generate a consumer tag on the client so we can exploit the nowait
flag.
             String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
@@ -973,7 +975,6 @@
         public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
         {
             AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
-            _logger.Info("XXX sending ack: " + ackFrame);
             if (_logger.IsDebugEnabled)
             {
                 _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel
" + _channelId);

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Thu Nov 30
10:54:48 2006
@@ -82,10 +82,15 @@
         /// </summary>
         private readonly object _syncLock = new object();
 
-        /**
-         * We store the prefetch field in order to be able to reuse it when resubscribing
in the event of failover
-         */
-        private int _prefetch;
+        /// <summary>
+        /// We store the high water prefetch field in order to be able to reuse it when resubscribing
in the event of failover
+        /// </summary>
+        private int _prefetchHigh;
+
+        /// <summary>
+        /// We store the low water prefetch field in order to be able to reuse it when resubscribing
in the event of failover
+        /// </summary>
+        private int _prefetchLow;
 
         /// <summary>
         /// When true indicates that either a message listener is set or that
@@ -108,8 +113,20 @@
         /// </summary>
         private long _lastDeliveryTag;
 
-        public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
-                                    MessageFactoryRegistry messageFactory, AmqChannel channel)
+        /// <summary>
+        /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+        /// </summary>
+        private int _outstanding;
+
+        /// <summary>
+        /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
+        /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at
< _prefetchLow
+        /// </summary>
+        private bool _dups_ok_acknowledge_send;
+
+        internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
+                                      MessageFactoryRegistry messageFactory, AmqChannel channel,
+                                      int prefetchHigh, int prefetchLow, bool exclusive)
         {
             _channelId = channelId;
             _queueName = queueName;
@@ -117,6 +134,9 @@
             _messageFactory = messageFactory;
             _channel = channel;
             _acknowledgeMode = _channel.AcknowledgeMode;
+            _prefetchHigh = prefetchHigh;
+            _prefetchLow = prefetchLow;
+            _exclusive = exclusive;
         }
 
         #region IMessageConsumer Members
@@ -302,65 +322,6 @@
             }
         }
 
-//        /// <summary>
-//        /// Called from the AmqChannel when a message has arrived for this consumer. This
methods handles both the case
-//        /// of a message listener or a synchronous receive() caller.
-//        /// </summary>
-//        /// <param name="messageFrame">the raw unprocessed mesage</param>
-//        /// <param name="acknowledgeMode">the acknowledge mode requested for this
message</param>
-//        /// <param name="channelId">channel on which this message was sent</param>
      
-//        internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode,
ushort channelId)
-//        {
-//            _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
-//            if (_logger.IsDebugEnabled)
-//            {
-//                _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
-//            }
-//            try
-//            {
-//                AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
-//                                                                              messageFrame.DeliverBody.Redelivered,
-//                                                                              messageFrame.ContentHeader,
-//                                                                              messageFrame.Bodies);
-
-//                /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
-//                {
-//                    _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
-//                }*/
-//                if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-//                {
-//                    // we set the session so that when the user calls acknowledge() it
can call the method on session
-//                    // to send out the appropriate frame
-//                    jmsMessage.Channel = _channel;
-//                }
-
-//                lock (_syncLock)
-//                {
-//                    if (_messageListener != null)
-//                    {
-//#if __MonoCS__
-//                        _messageListener(jmsMessage);
-//#else
-//                        _messageListener.Invoke(jmsMessage);
-//#endif
-//                    }
-//                    else
-//                    {
-//                        _synchronousQueue.Enqueue(jmsMessage);
-//                    }
-//                }
-//                if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
-//                {
-//                    _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
-//                }
-//            }
-//            catch (Exception e)
-//            {
-//                _logger.Error("Caught exception (dump follows) - ignoring...", e);
-//            }
-//        }
-
-
         /**
          * Called from the AMQSession when a message has arrived for this consumer. This
methods handles both the case
          * of a message listener or a synchronous receive() caller.
@@ -465,11 +426,6 @@
             DeregisterConsumer();
         }
 
-        public int Prefetch
-        {
-            get { return _prefetch; }
-        }
-
         public string QueueName
         {
             get { return _queueName; }
@@ -509,7 +465,6 @@
             AbstractQmsMessage msg = (AbstractQmsMessage) m;
             switch (AcknowledgeMode)
             {
-/* TODO
                 case AcknowledgeMode.DupsOkAcknowledge:
                     if (++_outstanding >= _prefetchHigh)
                     {
@@ -519,16 +474,16 @@
                     {
                         _dups_ok_acknowledge_send = false;
                     }
-
                     if (_dups_ok_acknowledge_send)
                     {
-                        _channel.AcknowledgeMessage(msg.getDeliveryTag(), true);
+                        _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
                     }
                     break;
- */
+
                 case AcknowledgeMode.AutoAcknowledge:
                     _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
                     break;
+                
                 case AcknowledgeMode.SessionTransacted:
                     _lastDeliveryTag = msg.DeliveryTag;
                     break;

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs Thu Nov 30 10:54:48
2006
@@ -24,7 +24,7 @@
 namespace Qpid.Collections
 {
     public abstract class BlockingQueue : Queue
-    {        
+    {
         /**
          * Inserts the specified element into this queue if it is possible to do
          * so immediately without violating capacity restrictions, returning

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs Thu Nov 30
10:54:48 2006
@@ -373,4 +373,3 @@
         }
     }
 }
-

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs Thu Nov 30 10:54:48 2006
@@ -60,7 +60,8 @@
         MessageConsumerBuilder CreateConsumerBuilder(string queueName);
 
         IMessageConsumer CreateConsumer(string queueName,
-                                        int prefetch,
+                                        int prefetchLow, 
+                                        int prefetchHigh,
                                         bool noLocal,
                                         bool exclusive,
                                         bool durable,

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs Thu Nov 30 10:54:48
2006
@@ -22,13 +22,16 @@
 {
     public class MessageConsumerBuilder
     {
-        private int _prefetch = 0;
+        public const int DEFAULT_PREFETCH_HIGH = 5000;
+
         private bool _noLocal = false;
         private bool _exclusive = false;
         private bool _durable = false;
         private string _subscriptionName = null;
         private IChannel _channel;
         private readonly string _queueName;
+        private int _prefetchLow = 2500;
+        private int _prefetchHigh = DEFAULT_PREFETCH_HIGH;
 
         public MessageConsumerBuilder(IChannel channel, string queueName)
         {
@@ -36,31 +39,37 @@
             _queueName = queueName;
         }
 
-        public MessageConsumerBuilder withPrefetch(int prefetch)
+        public MessageConsumerBuilder WithPrefetchLow(int prefetchLow)
+        {
+            _prefetchLow = prefetchLow;
+            return this;
+        }
+
+        public MessageConsumerBuilder WithPrefetchHigh(int prefetchHigh)
         {
-            _prefetch = prefetch;
+            _prefetchHigh = prefetchHigh;
             return this;
         }
 
-        public MessageConsumerBuilder withNoLocal(bool noLocal)
+        public MessageConsumerBuilder WithNoLocal(bool noLocal)
         {
             _noLocal = noLocal;
             return this;
         }
 
-        public MessageConsumerBuilder withExclusive(bool exclusive)
+        public MessageConsumerBuilder WithExclusive(bool exclusive)
         {
             _exclusive = exclusive;
             return this;
         }
 
-        public MessageConsumerBuilder withDurable(bool durable)
+        public MessageConsumerBuilder WithDurable(bool durable)
         {
             _durable = durable;
             return this;
         }
 
-        public MessageConsumerBuilder withSubscriptionName(string subscriptionName)
+        public MessageConsumerBuilder WithSubscriptionName(string subscriptionName)
         {
             _subscriptionName = subscriptionName;
             return this;
@@ -68,7 +77,7 @@
 
         public IMessageConsumer Create()
         {
-            return _channel.CreateConsumer(_queueName, _prefetch, _noLocal, _exclusive, _durable,
_subscriptionName);
+            return _channel.CreateConsumer(_queueName, _prefetchLow, _prefetchHigh, _noLocal,
_exclusive, _durable, _subscriptionName);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs Thu Nov 30
10:54:48 2006
@@ -47,37 +47,37 @@
             _channel = channel;
         }
 
-        public MessagePublisherBuilder withRoutingKey(string routingKey)
+        public MessagePublisherBuilder WithRoutingKey(string routingKey)
         {
             _routingKey = routingKey;
             return this;
         }
 
-        public MessagePublisherBuilder withExchangeName(string exchangeName)
+        public MessagePublisherBuilder WithExchangeName(string exchangeName)
         {
             _exchangeName = exchangeName;
             return this;
         }
 
-        public MessagePublisherBuilder withDeliveryMode(DeliveryMode deliveryMode)
+        public MessagePublisherBuilder WithDeliveryMode(DeliveryMode deliveryMode)
         {
             _deliveryMode = deliveryMode;
             return this;
         }
 
-        public MessagePublisherBuilder withTimeToLive(long timeToLive)
+        public MessagePublisherBuilder WithTimeToLive(long timeToLive)
         {
             _timeToLive = timeToLive;
             return this;
         }
 
-        public MessagePublisherBuilder withImmediate(bool immediate)
+        public MessagePublisherBuilder WithImmediate(bool immediate)
         {
             _immediate = immediate;
             return this;
         }
 
-        public MessagePublisherBuilder withMandatory(bool mandatory)
+        public MessagePublisherBuilder WithMandatory(bool mandatory)
         {
             _mandatory = mandatory;
             return this;

Modified: incubator/qpid/trunk/qpid/dotnet/TODO.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/TODO.txt?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/TODO.txt (original)
+++ incubator/qpid/trunk/qpid/dotnet/TODO.txt Thu Nov 30 10:54:48 2006
@@ -1,13 +1,4 @@
 
-https://issues.apache.org/jira/browse/QPID-136
-* createSession with prefetch (warning: prefetch partly added)
-  * Do the BasicQos message after opening channel (sets up prefetch).
-
-https://issues.apache.org/jira/browse/QPID-137
-* .NET currently only supports no-ack mode. Allow acknowledgement support.
-  * Implement the PreAcknowledge ack mode. Add preDeliver/postDeliver methods in AmqSession
like the Java client.
-  * Implement Recover() with Basic.Recover.
-
 * Port Connection URL support.
 
 * Implement durable subscriptions.



Mime
View raw message