Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 99455 invoked from network); 21 Jan 2009 22:49:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Jan 2009 22:49:13 -0000 Received: (qmail 8740 invoked by uid 500); 21 Jan 2009 22:49:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 8712 invoked by uid 500); 21 Jan 2009 22:49:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 8702 invoked by uid 99); 21 Jan 2009 22:49:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jan 2009 14:49:13 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Jan 2009 22:49:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 711502388A1E; Wed, 21 Jan 2009 14:48:52 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r736458 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Dispatcher.cs MessageConsumer.cs Date: Wed, 21 Jan 2009 22:48:52 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090121224852.711502388A1E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed Jan 21 14:48:51 2009 New Revision: 736458 URL: http://svn.apache.org/viewvc?rev=736458&view=rev Log: https://issues.apache.org/activemq/browse/AMQNET-106 Added the methods to send a MessagePull command when prefectch is zero and the dispatcher has no messages in its queue. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=736458&r1=736457&r2=736458&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs Wed Jan 21 14:48:51 2009 @@ -34,6 +34,11 @@ AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false); bool m_bAsyncDelivery = false; bool m_bClosed = false; + + public bool isEmpty() + { + return this.queue.Count == 0; + } public void SetAsyncDelivery(AutoResetEvent eventHandle) { Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=736458&r1=736457&r2=736458&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Jan 21 14:48:51 2009 @@ -99,18 +99,21 @@ public IMessage Receive() { CheckClosed(); + SendPullRequest(0); return SetupAcknowledge(dispatcher.Dequeue()); } public IMessage Receive(System.TimeSpan timeout) { CheckClosed(); + SendPullRequest((long) timeout.TotalMilliseconds); return SetupAcknowledge(dispatcher.Dequeue(timeout)); } public IMessage ReceiveNoWait() { CheckClosed(); + SendPullRequest(-1); return SetupAcknowledge(dispatcher.DequeueNoWait()); } @@ -258,6 +261,22 @@ return message; } + + protected void SendPullRequest( long timeout ) + { + CheckClosed(); + + if(this.info.PrefetchSize == 0 && this.dispatcher.isEmpty()) + { + MessagePull messagePull = new MessagePull(); + messagePull.ConsumerId = this.info.ConsumerId; + messagePull.Destination = this.info.Destination; + messagePull.Timeout = timeout; + + Tracer.Debug("Sending MessagePull: " + messagePull); + session.Connection.OneWay(messagePull); + } + } protected void DoNothingAcknowledge(ActiveMQMessage message) {