qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r584826 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: BasicMessageConsumer.java BasicMessageConsumer_0_10.java
Date Mon, 15 Oct 2007 17:21:45 GMT
Author: arnaudsimon
Date: Mon Oct 15 10:21:44 2007
New Revision: 584826

URL: http://svn.apache.org/viewvc?rev=584826&view=rev
Log:
increased number of runs

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=584826&r1=584825&r2=584826&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Oct 15 10:21:44 2007
@@ -266,7 +266,7 @@
         }
     }
 
-    private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
     {
 
         if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=584826&r1=584825&r2=584826&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Oct 15 10:21:44 2007
@@ -40,6 +40,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This is a 0.10 message consumer.
@@ -48,6 +49,10 @@
         implements org.apache.qpidity.nclient.util.MessageListener
 {
     /**
+     * A counter for keeping the number of available messages for this consumer
+     */
+    private final AtomicLong _messageCounter = new AtomicLong(0);
+    /**
      * This class logger
      */
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -78,7 +83,7 @@
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory,
session, protocolHandler,
               rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode,
noConsume, autoClose);
         _0_10session = (AMQSession_0_10) session;
-        if (messageSelector != null && messageSelector != "")
+        if (messageSelector != null && ! messageSelector.equals("") )
         {
             try
             {
@@ -161,6 +166,8 @@
             newMessage.setReplyToURL(replyToUrl);
         }
         newMessage.setContentHeader(headers);
+        // increase the counter of messages
+        _messageCounter.incrementAndGet();
         getSession().messageReceived(newMessage);
         // else ignore this message
     }
@@ -348,6 +355,11 @@
         return result;
     }
 
+      protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    {
+        _messageCounter.decrementAndGet();
+        super.preApplicationProcessing(jmsMsg);   
+    }
 
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
@@ -393,7 +405,10 @@
                 _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
                 _0_10session.getQpidSession().sync();
                 _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
-                o = _synchronousQueue.poll();
+                if( _messageCounter.get() > 0 )
+                {
+                      o = _synchronousQueue.take();
+                }
             }
         }
         else



Mime
View raw message