qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From philharveyonl...@apache.org
Subject svn commit: r1441385 - in /qpid/proton/branches/jni-binding: ./ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/ proton-j/proton-api/src/main/java/org/apa...
Date Fri, 01 Feb 2013 11:11:29 GMT
Author: philharveyonline
Date: Fri Feb  1 11:11:29 2013
New Revision: 1441385

URL: http://svn.apache.org/viewvc?rev=1441385&view=rev
Log:
NO-JIRA: Merged latest from trunk to this branch jni-binding with commands:

$ svn merge https://svn.apache.org/repos/asf/qpid/proton/trunk .                         
                                                                  
--- Merging r1441000 through r1441381 into '.':
U    proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
U    proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
U    proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
   C proton-j/proton/src/main/scripts
U    proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
--- Recording mergeinfo for merge of r1421251 through r1441381 into '.':
 U   .
Summary of conflicts:
  Tree conflicts: 1

$ svn merge -c 1441176 https://svn.apache.org/repos/asf/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py
./proton-j/proton-api/src/main/resources/proton.py
--- Merging r1441176 into 'proton-j/proton-api/src/main/resources/proton.py':
U    proton-j/proton-api/src/main/resources/proton.py
--- Recording mergeinfo for merge of r1441176 into 'proton-j/proton-api/src/main/resources/proton.py':
 U   proton-j/proton-api/src/main/resources/proton.py


To resolve conflicts, we had to add JNIDelivery.isPartial(), use MessengerFactory in Java
proton.py, and catch ProtonUnsupportedOperation in Messenger test.


Modified:
    qpid/proton/branches/jni-binding/   (props changed)
    qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
    qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
    qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
    qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py   (contents,
props changed)
    qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java

Propchange: qpid/proton/branches/jni-binding/
------------------------------------------------------------------------------
  Merged /qpid/proton/trunk:r1441000-1441381

Modified: qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
(original)
+++ qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
Fri Feb  1 11:11:29 2013
@@ -21,6 +21,7 @@
 package org.apache.qpid.proton.engine.jni;
 
 import org.apache.qpid.proton.ProtonCEquivalent;
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.jni.Proton;
@@ -222,6 +223,12 @@ public class JNIDelivery implements Deli
     }
 
     @Override
+    public boolean isPartial()
+    {
+        throw new ProtonUnsupportedOperationException();
+    }
+
+    @Override
     @ProtonCEquivalent("pn_delivery_settled")
     public boolean isSettled()
     {

Modified: qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
(original)
+++ qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
Fri Feb  1 11:11:29 2013
@@ -19,6 +19,7 @@
  */
 package org.apache.qpid.proton.messenger.jni;
 
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
 import org.apache.qpid.proton.messenger.Messenger;
 import org.apache.qpid.proton.messenger.MessengerFactory;
 
@@ -28,13 +29,13 @@ public class JNIMessengerFactory impleme
     @Override
     public Messenger createMessenger()
     {
-        throw new UnsupportedOperationException();
+        throw new ProtonUnsupportedOperationException();
     }
 
     @Override
     public Messenger createMessenger(String name)
     {
-        throw new UnsupportedOperationException();
+        throw new ProtonUnsupportedOperationException();
     }
 
 }

Modified: qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
(original)
+++ qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
Fri Feb  1 11:11:29 2013
@@ -68,4 +68,6 @@ public interface Delivery
     public Object getContext();
 
     public boolean isUpdated();
+
+    public boolean isPartial();
 }

Modified: qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py Fri
Feb  1 11:11:29 2013
@@ -25,11 +25,12 @@ from org.apache.qpid.proton.engine impor
     EndpointState, TransportException
 from org.apache.qpid.proton.message import \
     MessageFormat, MessageFactory, Message as JMessage
-from org.apache.qpid.proton.messenger import MessengerException, Status
+from org.apache.qpid.proton.messenger import MessengerFactory, MessengerException, Status
 from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, AmqpValue
 from org.apache.qpid.proton.amqp import UnsignedInteger
 from jarray import zeros
 from java.util import EnumSet, UUID as JUUID
+from java.util.concurrent import TimeoutException as Timeout
 
 LANGUAGE = "Java"
 
@@ -55,6 +56,7 @@ AUTOMATIC = "AUTOMATIC"
 protonFactoryLoader = ProtonFactoryLoader()
 engineFactory = protonFactoryLoader.loadFactory(EngineFactory)
 messageFactory = protonFactoryLoader.loadFactory(MessageFactory)
+messengerFactory = protonFactoryLoader.loadFactory(MessengerFactory)
 
 
 class Endpoint(object):
@@ -511,15 +513,14 @@ class Data(object):
   def __init__(self, *args, **kwargs):
     raise Skipped()
 
-class Timeout(Exception):
-  pass
-
 class Messenger(object):
 
   def __init__(self, *args, **kwargs):
-    #comment out or remove line below to enable messenger tests
-    raise Skipped()
-    self.impl = MessengerImpl()
+    try:
+      self.impl = messengerFactory.createMessenger()
+    except ProtonUnsupportedOperationException:
+      raise Skipped()
+
 
   def start(self):
     self.impl.start()
@@ -541,10 +542,9 @@ class Messenger(object):
     self.impl.recv(n)
 
   def get(self, message=None):
-    if message is None:
-      self.impl.get()
-    else:
-      message.impl = self.impl.get()
+    result = self.impl.get()
+    if message and result:
+      message.impl = result
     return self.impl.incomingTracker()
 
   @property

Propchange: qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py
------------------------------------------------------------------------------
  Merged /qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py:r1441176

Modified: qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
(original)
+++ qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
Fri Feb  1 11:11:29 2013
@@ -55,7 +55,7 @@ class ConnectorImpl<C> implements Connec
 
     private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
     private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
-    private boolean _readPending;
+    private boolean _readPending = true;
 
     ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C context,
SelectionKey key)
     {
@@ -81,33 +81,56 @@ class ConnectorImpl<C> implements Connec
                 _readPending = false;
                 if (isClosed()) return;
             }
+            else
+            {
+                processInput();
+            }
             write();
         }
     }
 
-    void read() throws IOException
+    private void read() throws IOException
     {
         int bytesRead = 0;
         while ((bytesRead = _channel.read(_readBuffer)) > 0)
         {
-            _readBuffer.flip();
-            int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(),
_readBuffer.limit());
-            _readBuffer.position(consumed == Transport.END_OF_STREAM ? _readBuffer.limit()
: consumed);
+            processInput();
+        }
+        if (bytesRead == -1) {
+            close();
+        }
+    }
+
+    private int processInput() throws IOException
+    {
+        _readBuffer.flip();
+        int total = 0;
+        while (_readBuffer.hasRemaining())
+        {
+            int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(),
_readBuffer.remaining());
+            if (consumed == Transport.END_OF_STREAM)
+            {
+                continue;
+            }
+            else if (consumed == 0)
+            {
+                break;
+            }
+            _readBuffer.position(_readBuffer.position() + consumed);
             if (_logger.isLoggable(Level.FINE))
             {
                 _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + _readBuffer.remaining()
+ " available");
             }
-            _readBuffer.compact();
-        }
-        if (bytesRead == -1) {
-            close();
+            total += consumed;
         }
+        _readBuffer.compact();
+        return total;
     }
 
-    void write() throws IOException
+    private void write() throws IOException
     {
         int interest = _key.interestOps();
-        int start = _writeBuffer.position();
+        boolean empty = _writeBuffer.position() == 0;
         boolean done = false;
         while (!done)
         {
@@ -119,19 +142,20 @@ class ConnectorImpl<C> implements Connec
             {
                 _logger.log(Level.FINE, "wrote " + wrote + " bytes, " + _writeBuffer.remaining()
+ " remaining");
             }
-            _writeBuffer.compact();
-            if (_writeBuffer.position() > 0)
+            if (_writeBuffer.hasRemaining())
             {
                 //weren't able to write all available data, ask to be notfied when we can
write again
+                _writeBuffer.compact();
                 interest |= SelectionKey.OP_WRITE;
                 done = true;
             }
             else
             {
-                //we are done if buffer was empty to begin with and we did not produce enough
to fill it
+                //we are done if buffer was empty to begin with and we did not produce anything
+                _writeBuffer.clear();
                 interest &= ~SelectionKey.OP_WRITE;
-                done = start == 0 && produced < _writeBuffer.capacity();
-                start = 0;
+                done = empty && produced == 0;
+                empty = true;
             }
         }
         _key.interestOps(interest);

Modified: qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
(original)
+++ qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
Fri Feb  1 11:11:29 2013
@@ -408,6 +408,11 @@ public class DeliveryImpl implements Del
         _complete = true;
     }
 
+    public boolean isPartial()
+    {
+        return !_complete;
+    }
+
     void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
     {
         _remoteDeliveryState = remoteDeliveryState;

Modified: qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1441385&r1=1441384&r2=1441385&view=diff
==============================================================================
--- qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
(original)
+++ qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
Fri Feb  1 11:11:29 2013
@@ -134,7 +134,6 @@ public class MessengerImpl implements Me
             try
             {
                 c.process();
-                c.close();
             }
             catch (IOException e)
             {
@@ -153,6 +152,14 @@ public class MessengerImpl implements Me
                 _logger.log(Level.WARNING, "Error while closing listener", e);
             }
         }
+        try
+        {
+            waitUntil(_allClosed);
+        }
+        catch(TimeoutException e)
+        {
+            _logger.log(Level.WARNING, "Timed out while waiting for close", e);
+        }
         _driver.destroy();
     }
 
@@ -215,7 +222,7 @@ public class MessengerImpl implements Me
             Delivery delivery = connection.getWorkHead();
             while (delivery != null)
             {
-                if (delivery.isReadable())
+                if (delivery.isReadable() && !delivery.isPartial())
                 {
                     _logger.log(Level.FINE, "Readable delivery found: " + delivery);
                     int size = read((Receiver) delivery.getLink());
@@ -463,9 +470,16 @@ public class MessengerImpl implements Me
             {
                 session.close();
             }
-            if (connection.getLocalState() == EndpointState.ACTIVE && connection.getRemoteState()
== EndpointState.CLOSED)
+            if (connection.getRemoteState() == EndpointState.CLOSED)
             {
-                connection.close();
+                if (connection.getLocalState() == EndpointState.ACTIVE)
+                {
+                    connection.close();
+                }
+                else if (connection.getLocalState() == EndpointState.CLOSED)
+                {
+                    c.close();
+                }
             }
 
             if (c.isClosed())
@@ -501,7 +515,7 @@ public class MessengerImpl implements Me
 
         boolean wait = deadline > System.currentTimeMillis();
         boolean first = true;
-        boolean done = condition.test();
+        boolean done = false;
 
         while (first || (!done && wait))
         {
@@ -513,6 +527,10 @@ public class MessengerImpl implements Me
             done = done || condition.test();
             first = false;
         }
+        if (!done)
+        {
+            throw new TimeoutException();
+        }
     }
 
     private Connection lookup(String host, String service)
@@ -581,7 +599,7 @@ public class MessengerImpl implements Me
             for (Connector c : _driver.connectors())
             {
                 Connection connection = c.getConnection();
-                for (Link link : new Links(connection, ACTIVE, ACTIVE))
+                for (Link link : new Links(connection, ACTIVE, ANY))
                 {
                     if (link instanceof Sender)
                     {
@@ -642,9 +660,8 @@ public class MessengerImpl implements Me
                 Delivery delivery = connection.getWorkHead();
                 while (delivery != null)
                 {
-                    if (delivery.isReadable())
+                    if (delivery.isReadable() && !delivery.isPartial())
                     {
-                        //TODO: check for partial delivery?
                         return true;
                     }
                     else
@@ -657,8 +674,19 @@ public class MessengerImpl implements Me
         }
     }
 
+    private class AllClosed implements Predicate
+    {
+        public boolean test()
+        {
+            if (_driver.connectors().iterator().hasNext()) return false;
+            else return true;
+        }
+
+    }
+
     private final SentSettled _sentSettled = new SentSettled();
     private final MessageAvailable _messageAvailable = new MessageAvailable();
+    private final AllClosed _allClosed = new AllClosed();
 
     private interface LinkFinder<C extends Link>
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message