qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arnaudsi...@apache.org
Subject svn commit: r581912 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpidity/nclient/impl/ common/src/main/java/org/apache/qpidity/transport/
Date Thu, 04 Oct 2007 13:57:04 GMT
Author: arnaudsimon
Date: Thu Oct  4 06:57:01 2007
New Revision: 581912

URL: http://svn.apache.org/viewvc?rev=581912&view=rev
Log:
Changed to send message ack when required

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=581912&r1=581911&r2=581912&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Oct  4 06:57:01 2007
@@ -265,10 +265,10 @@
      *
      * @todo This is accessed only within a synchronized method, so does not need to be atomic.
      */
-    private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+    protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
     /** Used to indicate that the session should start pre-fetching messages as soon as it
is started. */
-    private final boolean _immediatePrefetch;
+    protected final boolean _immediatePrefetch;
 
     /** Indicates that warnings should be generated on violations of the strict AMQP. */
     private final boolean _strictAMQP;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=581912&r1=581911&r2=581912&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Thu Oct  4 06:57:01 2007
@@ -528,4 +528,26 @@
             startDistpatcherIfNecessary();
         }
     }
+
+      synchronized void startDistpatcherIfNecessary()
+    {
+        // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+        if (!_immediatePrefetch)
+        {
+            // We do this now if this is the first call on a started connection
+            if (isSuspended() &&  _firstDispatcher.getAndSet(false))
+            {
+                try
+                {
+                    suspendChannel(false);
+                }
+                catch (AMQException e)
+                {
+                    _logger.info("Unsuspending channel threw an exception:" + e);
+                }
+            }
+        }
+
+        startDistpatcherIfNecessary(false);
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=581912&r1=581911&r2=581912&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
(original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Thu Oct  4 06:57:01 2007
@@ -27,12 +27,9 @@
     {
         for (Range range : ranges)
         {
-            for (long l = range.getLower(); l <= range.getUpper(); l++)
-            {
-                System.out.println("Acknowleding transfer id : " + l);
-                super.processed(l);
-            }
+            super.processed(range);
         }
+        super.flushProcessed();
     }
 
     public void messageSubscribe(String queue, String destination, short confirmMode, short
acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=581912&r1=581911&r2=581912&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
(original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Thu Oct  4 06:57:01 2007
@@ -111,7 +111,7 @@
         }
     }
 
-    void flushProcessed()
+   public void flushProcessed()
     {
         long mark = -1;
         boolean first = true;



Mime
View raw message