qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1516427 - in /qpid/proton/trunk: proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/ proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/ proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/...
Date Thu, 22 Aug 2013 12:14:20 GMT
Author: rhs
Date: Thu Aug 22 12:14:20 2013
New Revision: 1516427

URL: http://svn.apache.org/r1516427
Log:
Fixed hang in Messenger.stop(); added recv() to Messenger interface.

Modified:
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1516427&r1=1516426&r2=1516427&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
(original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
Thu Aug 22 12:14:20 2013
@@ -100,6 +100,12 @@ class JNIMessenger implements Messenger
     }
 
     @Override
+    public void recv() throws TimeoutException
+    {
+        recv(-1);
+    }
+
+    @Override
     public void recv(final int count) throws TimeoutException
     {
         int err = Proton.pn_messenger_recv(_impl, count);

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1516427&r1=1516426&r2=1516427&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
(original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
Thu Aug 22 12:14:20 2013
@@ -106,6 +106,12 @@ public interface Messenger
      */
     void subscribe(String source) throws MessengerException;
     /**
+     * Receives an arbitrary number of messages into the
+     * incoming queue of the Messenger. This method will block until
+     * at least one message is available or the operation times out.
+     */
+    void recv() throws TimeoutException;
+    /**
      * Receives up to the specified number of messages into the
      * incoming queue of the Messenger. This method will block until
      * at least one message is available or the operation times out.

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1516427&r1=1516426&r2=1516427&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Thu Aug 22 12:14:20 2013
@@ -137,14 +137,6 @@ public class MessengerImpl implements Me
         {
             Connection connection = c.getConnection();
             connection.close();
-            try
-            {
-                c.process();
-            }
-            catch (IOException e)
-            {
-                _logger.log(Level.WARNING, "Error while sending close", e);
-            }
         }
         //stop listeners
         for (Listener<?> l : _driver.listeners())
@@ -246,6 +238,11 @@ public class MessengerImpl implements Me
         waitUntil(_messageAvailable);
     }
 
+    public void recv() throws TimeoutException
+    {
+        recv(-1);
+    }
+
     public int receiving()
     {
         return _receiving;
@@ -435,6 +432,7 @@ public class MessengerImpl implements Me
         distributeCredit();
         for (Connector<?> c : _driver.connectors())
         {
+            processEndpoints(c);
             try
             {
                 if (c.process()) {
@@ -445,7 +443,6 @@ public class MessengerImpl implements Me
             {
                 _logger.log(Level.SEVERE, "Error processing connection", e);
             }
-            processEndpoints(c);
         }
         bringDestruction();
         distributeCredit();
@@ -483,6 +480,22 @@ public class MessengerImpl implements Me
                 _logger.log(Level.SEVERE, "Error processing connection", e);
             }
             processEndpoints(c);
+            if (c.isClosed())
+            {
+                _awaitingDestruction.add(c);
+                reclaimCredit(c.getConnection());
+            }
+            else
+            {
+                try
+                {
+                    c.process();
+                }
+                catch (IOException e)
+                {
+                    _logger.log(Level.SEVERE, "Error processing connection", e);
+                }
+            }
         }
         bringDestruction();
         distributeCredit();
@@ -540,23 +553,6 @@ public class MessengerImpl implements Me
                 connection.close();
             }
         }
-
-        if (c.isClosed())
-        {
-            _awaitingDestruction.add(c);
-            reclaimCredit(connection);
-        }
-        else
-        {
-            try
-            {
-                c.process();
-            }
-            catch (IOException e)
-            {
-                _logger.log(Level.SEVERE, "Error processing connection", e);
-            }
-        }
     }
 
     private boolean waitUntil(Predicate condition) throws TimeoutException
@@ -771,8 +767,12 @@ public class MessengerImpl implements Me
     {
         public boolean test()
         {
-            if (_driver.connectors().iterator().hasNext()) return false;
-            else return true;
+            for (Connector<?> c : _driver.connectors()) {
+                if (!c.isClosed()) {
+                    return false;
+                }
+            }
+            return true;
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message