qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r583251 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQConnection.java AMQConnectionURL.java AMQSession_0_10.java BasicMessageConsumer_0_10.java
Date Tue, 09 Oct 2007 18:15:46 GMT
Author: rajith
Date: Tue Oct  9 11:15:45 2007
New Revision: 583251

URL: http://svn.apache.org/viewvc?rev=583251&view=rev
Log:
Fixed an error with the credit based flow control

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Tue Oct  9 11:15:45 2007
@@ -393,7 +393,7 @@
 
     private void setVirtualHost(String virtualHost)
     {
-        if (virtualHost.startsWith("/"))
+        if (virtualHost != null && virtualHost.startsWith("/"))
         {
             virtualHost = virtualHost.substring(1);
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
Tue Oct  9 11:15:45 2007
@@ -268,7 +268,8 @@
 
     public String toString()
     {
-        StringBuffer sb = new StringBuffer();
+        return _url;
+        /*StringBuffer sb = new StringBuffer();
 
         sb.append(AMQ_PROTOCOL);
         sb.append("://");
@@ -299,7 +300,7 @@
 
         sb.append(optionsToString());
 
-        return sb.toString();
+        return sb.toString();*/
     }
 
     private String optionsToString()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Oct  9 11:15:45 2007
@@ -345,6 +345,7 @@
                                           consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
                                           consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
 
+        getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();
         getCurrentException();
@@ -438,7 +439,7 @@
             {
                 getQpidSession().messageStop(consumer.getConsumerTag().toString());
                 getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
-
+                getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
             }
         }
         else

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Oct  9 11:15:45 2007
@@ -174,7 +174,7 @@
     {
         ((AMQSession_0_10) getSession()).getQpidSession().messageStop(getConsumerTag().toString());
         ((AMQSession_0_10) getSession()).getQpidSession().sync();
-        // confirm cancel 
+        // confirm cancel
         getSession().confirmConsumerCancelled(getConsumerTag());
         try
         {
@@ -303,7 +303,7 @@
       {
           // do nothing as the rollback operation will do the job.
       }
-    
+
     /**
      * Acquire a message
      *
@@ -338,8 +338,11 @@
         super.setMessageListener(messageListener);
         if (messageListener == null)
         {
-            _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
             _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+            _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+                    0xFFFFFFFF);
             _0_10session.getQpidSession().sync();
         }
         else
@@ -367,14 +370,19 @@
         if (l > 0)
         {
             o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
-            _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
-            _0_10session.getQpidSession().sync();
-            o = _synchronousQueue.poll();
+            if (o == null)
+            {
+                _logger.debug("Message Didn't arrive in time, checking if one is inflight");
+               // checking if one is inflight
+                _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
+                _0_10session.getQpidSession().sync();
+                o = _synchronousQueue.poll();
+            }
         }
         else
         {
             o = _synchronousQueue.take();
         }
-        return null;
+        return o;
     }
 }



Mime
View raw message