qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject svn commit: r1604907 - /qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Date Mon, 23 Jun 2014 18:57:55 GMT
Author: rajith
Date: Mon Jun 23 18:57:54 2014
New Revision: 1604907

URL: http://svn.apache.org/r1604907
Log:
PROTON-589
Fixed test failures and cleaned up the code.
A few more improvements could be made.
Currently 2 SSL tests are failing, but not sure if they are genuine test
failures.

Modified:
    qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java

Modified: qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1604907&r1=1604906&r2=1604907&view=diff
==============================================================================
--- qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++ qpid/proton/branches/rajith-messenger-passive/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Mon Jun 23 18:57:54 2014
@@ -237,14 +237,13 @@ public class MessengerImpl implements Me
         //close all connections.
         for (SelectableImpl sel : _selectables)
         {
-            SelectableImpl s = (SelectableImpl)sel;
-            Connection connection = s.getConnection();
+            Connection connection = sel.getConnection();
             connection.close();
-            if (!_passive && !s.getNetworkConnection().isClosed())
+            if (!_passive && !sel.getNetworkConnection().isClosed())
             {
-                s.getNetworkConnection().registerForWriteEvents(true);
+                sel.getNetworkConnection().registerForWriteEvents(true);
             }
-            s.markClosed();
+            sel.markClosed();
         }
 
         waitUntil(_allClosed);
@@ -796,10 +795,6 @@ public class MessengerImpl implements Me
             }
         }
         ConnectionContext ctx = (ConnectionContext)connection.getContext();
-        if (!_passive)
-        {
-            ((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
-        }
     }
 
     private void processSession(Session session)
@@ -820,10 +815,6 @@ public class MessengerImpl implements Me
             }
         }
         ConnectionContext ctx = (ConnectionContext)session.getConnection().getContext();
-        if (!_passive)
-        {
-            ((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
-        }
     }
 
     private void processLink(Link link)
@@ -851,10 +842,6 @@ public class MessengerImpl implements Me
             }
         }
         ConnectionContext ctx = (ConnectionContext)link.getSession().getConnection().getContext();
-        if (!_passive)
-        {
-            ((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
-        }
     }
 
     private void processFlow(Link link)
@@ -863,11 +850,6 @@ public class MessengerImpl implements Me
         {
             pumpOut(link.getTarget().getAddress(), (Sender)link);
         }
-        ConnectionContext ctx = (ConnectionContext)link.getSession().getConnection().getContext();
-        if (!_passive)
-        {
-            ((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
-        }
     }
 
     private void processDelivery(Delivery delivery)
@@ -889,11 +871,6 @@ public class MessengerImpl implements Me
         }
 
         delivery.clear();
-        ConnectionContext ctx = (ConnectionContext)link.getSession().getConnection().getContext();
-        if (!_passive)
-        {
-            ((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
-        }
     }
 
     private boolean waitUntil(Predicate condition) throws TimeoutException
@@ -913,14 +890,18 @@ public class MessengerImpl implements Me
     }
 
     private boolean waitUntil(Predicate condition, long timeout)
-    {
+    {        
+        if (!_passive)
+        {
+            processEvents();            
+            processPendingSelectables();
+        }
         processEvents();
 
         if (_passive)
         {
             return condition.test();
         }
-
         // wait until timeout expires or until test is true
         long now = System.currentTimeMillis();
         final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
@@ -949,7 +930,7 @@ public class MessengerImpl implements Me
                 long wakeup = (_next_drain > now) ? _next_drain - now : 0;
                 remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
             }
-            processPendingSelectables(remaining);
+            waitOnIOEvents(remaining);
             processEvents();
             if (_interrupted.get())
             {
@@ -963,7 +944,28 @@ public class MessengerImpl implements Me
     }
 
     // Used when passive mode is false.
-    private void processPendingSelectables(long timeout)
+    private void processPendingSelectables()
+    {
+        //Iterate through the Selectables and read/write if required.
+        Iterator<SelectableImpl> it = _selectables.iterator();
+        while (it.hasNext())
+        {
+            SelectableImpl sel = it.next();
+            if (sel.isCompleted())
+            {
+                it.remove();
+                continue;
+            }
+            connectionWritable(sel);
+            connectionReadable(sel);
+            if (sel.isCompleted())
+            {
+                it.remove();
+            }
+        }
+    }
+
+    private void waitOnIOEvents(long timeout)
     {
         try
         {
@@ -1105,11 +1107,6 @@ public class MessengerImpl implements Me
             _credited.add(link);
 
             // flow changed, must process it
-            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
-            if (!_passive)
-            {
-                ((SelectableImpl)ctx.getSelectable()).getNetworkConnection().registerForWriteEvents(true);
-            }
         }
 
         if (_blocked.isEmpty())
@@ -1695,8 +1692,6 @@ public class MessengerImpl implements Me
      */
     void inboundConnection(Listener listener, IoConnection networkConnection)
     {
-        //System.out.println("inboundConnection ............................................"+
_name);
-
         _worked = true;
         Connection connection = Proton.connection();
         connection.collect(_collector);
@@ -1731,13 +1726,23 @@ public class MessengerImpl implements Me
     
     void connectionReadable(SelectableImpl selectable)
     {
-        //System.out.println("connectionReadable ............................................"
+ _name);
-        
-        _worked = true;
+        if (selectable.isCompleted())
+        {
+            return;
+        }
         IoConnection networkConnection = selectable.getNetworkConnection();
         SelectableImpl sel = (SelectableImpl) selectable;
         Transport transport = sel.getTransport();
-        ByteBuffer tail = transport.tail();
+        ByteBuffer tail = null;
+        try
+        {
+            tail = transport.tail();
+        }
+        catch (Exception e1)
+        {
+            connectionClosed(sel);
+            return;
+        }
         try
         {
             int read = networkConnection.read(tail);
@@ -1746,6 +1751,10 @@ public class MessengerImpl implements Me
                 connectionClosed(sel);
                 return;
             }
+            if (read > 0)
+            {
+                _worked = true;
+            }
         }
         catch (IOException e)
         {
@@ -1760,15 +1769,13 @@ public class MessengerImpl implements Me
         {
             _logger.log(Level.SEVERE, this + " error processing input", e);
         }
-        networkConnection.registerForReadEvents(transport.capacity() > 0);
+        //Currently it doesn't work without the following. But this is best handled elsewhere.
+        //Need to investigate and find the best way of handling it/
         networkConnection.registerForWriteEvents(true);
     }
 
    void connectionWritable(SelectableImpl selectable)
     {
-       //System.out.println("connectionWritable ............................................"
+ _name);
-       
-        _worked = true;
         IoConnection networkConnection = selectable.getNetworkConnection();
         SelectableImpl sel = (SelectableImpl) selectable;
         Transport transport = sel.getTransport();
@@ -1785,11 +1792,14 @@ public class MessengerImpl implements Me
                 }
                 catch (IOException e)
                 {
-                    _logger.log(Level.SEVERE, this + " error writing to the file descriptor",
e);
+                    _logger.log(Level.SEVERE, this + " error writing to network connection
: " + e.getMessage(), e);
+                    networkConnection.registerForWriteEvents(false);
+                    return;
                     // Need to throw the exception as well.
                 }
                 if (wrote > 0)
                 {
+                    _worked = true;
                     transport.pop(wrote);
                 }
                 else
@@ -1799,7 +1809,7 @@ public class MessengerImpl implements Me
             }
             networkConnection.registerForWriteEvents(transport.pending() > 0);
 
-            if (selectable.isClosed() && !_passive)
+            if (selectable.isClosed())
             {
                 try
                 {
@@ -1836,6 +1846,5 @@ public class MessengerImpl implements Me
             }
         }
         sel.markCompleted();
-        _selectables.remove(sel);
     }
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message