qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r585294 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Date Wed, 17 Oct 2007 00:11:59 GMT
Author: rajith
Date: Tue Oct 16 17:11:58 2007
New Revision: 585294

URL: http://svn.apache.org/viewvc?rev=585294&view=rev
Log:
There is an issue when we do client side selectors for sync receive.
when a message is released (bcos it didn't match the selector) we need to request more credit
as in credit mode, a release doesn't automatically increase credit.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=585294&r1=585293&r2=585294&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Oct 16 17:11:58 2007
@@ -133,7 +133,6 @@
         }
     }
 
-
     public void onMessage(Message message)
     {
         int channelId = getSession().getChannelId();
@@ -272,6 +271,7 @@
                 _logger.debug("filterMessage - trying to ack message");
             }
             acknowledgeMessage(message);
+            requestCreditIfCreditMode();
         }
         else if (!messageOk)
         {
@@ -280,6 +280,7 @@
                 _logger.debug("Message not OK, releasing");
             }
             releaseMessage(message);
+            requestCreditIfCreditMode();
         }
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
@@ -292,6 +293,26 @@
             messageOk = acquireMessage(message);
         }
         return messageOk;
+    }
+
+    private void requestCreditIfCreditMode()
+    {
+        try
+        {
+            // the current message received is not good, so we need to get a message.
+            if (getMessageListener() == null)
+            {
+                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+                _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
+                _0_10session.getQpidSession().sync();
+                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
+            }
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error getting message listener, couldn't request credit after
releasing a message that failed the selector test",e);
+        }
     }
 
     /**



Mime
View raw message