qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r586382 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: BasicMessageConsumer.java BasicMessageConsumer_0_10.java
Date Fri, 19 Oct 2007 11:28:00 GMT
Author: arnaudsimon
Date: Fri Oct 19 04:28:00 2007
New Revision: 586382

URL: http://svn.apache.org/viewvc?rev=586382&view=rev
Log:
changed to handle async pre-fetch

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=586382&r1=586381&r2=586382&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Fri Oct 19 04:28:00 2007
@@ -207,7 +207,7 @@
         return _acknowledgeMode;
     }
 
-    private boolean isMessageListenerSet()
+    protected boolean isMessageListenerSet()
     {
         return _messageListener.get() != null;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=586382&r1=586381&r2=586382&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Fri Oct 19 04:28:00 2007
@@ -53,6 +53,12 @@
      * A counter for keeping the number of available messages for this consumer
      */
     private final AtomicLong _messageCounter = new AtomicLong(0);
+
+    /**
+     * Number of received message so far
+     */
+      private final AtomicLong _messagesReceived = new AtomicLong(0);
+
     /**
      * This class logger
      */
@@ -135,6 +141,18 @@
 
     public void onMessage(Message message)
     {
+        if( isMessageListenerSet())
+        {
+            _messagesReceived.incrementAndGet();
+            if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+            {
+                // require more credit
+                 _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                                                          org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                                          AMQSession_0_10.MAX_PREFETCH);
+                _messagesReceived.set(0);
+            }
+        }
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
         String consumerTag = getConsumerTag().toString();
@@ -417,6 +435,7 @@
                                                           org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                           0xFFFFFFFF);
                 _0_10session.getQpidSession().sync();
+                _messagesReceived.set(0);;
             }
         }
     }



Mime
View raw message