Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4AB8CE10D for ; Fri, 1 Feb 2013 11:11:57 +0000 (UTC) Received: (qmail 80266 invoked by uid 500); 1 Feb 2013 11:11:57 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 80184 invoked by uid 500); 1 Feb 2013 11:11:55 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 80168 invoked by uid 99); 1 Feb 2013 11:11:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Feb 2013 11:11:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Feb 2013 11:11:52 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2F5C42388ABA; Fri, 1 Feb 2013 11:11:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1441385 - in /qpid/proton/branches/jni-binding: ./ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/ proton-j/proton-api/src/main/java/org/apa... Date: Fri, 01 Feb 2013 11:11:29 -0000 To: commits@qpid.apache.org From: philharveyonline@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130201111130.2F5C42388ABA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: philharveyonline Date: Fri Feb 1 11:11:29 2013 New Revision: 1441385 URL: http://svn.apache.org/viewvc?rev=1441385&view=rev Log: NO-JIRA: Merged latest from trunk to this branch jni-binding with commands: $ svn merge https://svn.apache.org/repos/asf/qpid/proton/trunk . --- Merging r1441000 through r1441381 into '.': U proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java U proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java U proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java C proton-j/proton/src/main/scripts U proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java --- Recording mergeinfo for merge of r1421251 through r1441381 into '.': U . Summary of conflicts: Tree conflicts: 1 $ svn merge -c 1441176 https://svn.apache.org/repos/asf/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py ./proton-j/proton-api/src/main/resources/proton.py --- Merging r1441176 into 'proton-j/proton-api/src/main/resources/proton.py': U proton-j/proton-api/src/main/resources/proton.py --- Recording mergeinfo for merge of r1441176 into 'proton-j/proton-api/src/main/resources/proton.py': U proton-j/proton-api/src/main/resources/proton.py To resolve conflicts, we had to add JNIDelivery.isPartial(), use MessengerFactory in Java proton.py, and catch ProtonUnsupportedOperation in Messenger test. Modified: qpid/proton/branches/jni-binding/ (props changed) qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py (contents, props changed) qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Propchange: qpid/proton/branches/jni-binding/ ------------------------------------------------------------------------------ Merged /qpid/proton/trunk:r1441000-1441381 Modified: qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original) +++ qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Fri Feb 1 11:11:29 2013 @@ -21,6 +21,7 @@ package org.apache.qpid.proton.engine.jni; import org.apache.qpid.proton.ProtonCEquivalent; +import org.apache.qpid.proton.ProtonUnsupportedOperationException; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.jni.Proton; @@ -222,6 +223,12 @@ public class JNIDelivery implements Deli } @Override + public boolean isPartial() + { + throw new ProtonUnsupportedOperationException(); + } + + @Override @ProtonCEquivalent("pn_delivery_settled") public boolean isSettled() { Modified: qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java (original) +++ qpid/proton/branches/jni-binding/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java Fri Feb 1 11:11:29 2013 @@ -19,6 +19,7 @@ */ package org.apache.qpid.proton.messenger.jni; +import org.apache.qpid.proton.ProtonUnsupportedOperationException; import org.apache.qpid.proton.messenger.Messenger; import org.apache.qpid.proton.messenger.MessengerFactory; @@ -28,13 +29,13 @@ public class JNIMessengerFactory impleme @Override public Messenger createMessenger() { - throw new UnsupportedOperationException(); + throw new ProtonUnsupportedOperationException(); } @Override public Messenger createMessenger(String name) { - throw new UnsupportedOperationException(); + throw new ProtonUnsupportedOperationException(); } } Modified: qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original) +++ qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Fri Feb 1 11:11:29 2013 @@ -68,4 +68,6 @@ public interface Delivery public Object getContext(); public boolean isUpdated(); + + public boolean isPartial(); } Modified: qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py (original) +++ qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py Fri Feb 1 11:11:29 2013 @@ -25,11 +25,12 @@ from org.apache.qpid.proton.engine impor EndpointState, TransportException from org.apache.qpid.proton.message import \ MessageFormat, MessageFactory, Message as JMessage -from org.apache.qpid.proton.messenger import MessengerException, Status +from org.apache.qpid.proton.messenger import MessengerFactory, MessengerException, Status from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, AmqpValue from org.apache.qpid.proton.amqp import UnsignedInteger from jarray import zeros from java.util import EnumSet, UUID as JUUID +from java.util.concurrent import TimeoutException as Timeout LANGUAGE = "Java" @@ -55,6 +56,7 @@ AUTOMATIC = "AUTOMATIC" protonFactoryLoader = ProtonFactoryLoader() engineFactory = protonFactoryLoader.loadFactory(EngineFactory) messageFactory = protonFactoryLoader.loadFactory(MessageFactory) +messengerFactory = protonFactoryLoader.loadFactory(MessengerFactory) class Endpoint(object): @@ -511,15 +513,14 @@ class Data(object): def __init__(self, *args, **kwargs): raise Skipped() -class Timeout(Exception): - pass - class Messenger(object): def __init__(self, *args, **kwargs): - #comment out or remove line below to enable messenger tests - raise Skipped() - self.impl = MessengerImpl() + try: + self.impl = messengerFactory.createMessenger() + except ProtonUnsupportedOperationException: + raise Skipped() + def start(self): self.impl.start() @@ -541,10 +542,9 @@ class Messenger(object): self.impl.recv(n) def get(self, message=None): - if message is None: - self.impl.get() - else: - message.impl = self.impl.get() + result = self.impl.get() + if message and result: + message.impl = result return self.impl.incomingTracker() @property Propchange: qpid/proton/branches/jni-binding/proton-j/proton-api/src/main/resources/proton.py ------------------------------------------------------------------------------ Merged /qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py:r1441176 Modified: qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original) +++ qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Fri Feb 1 11:11:29 2013 @@ -55,7 +55,7 @@ class ConnectorImpl implements Connec private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize); private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize); - private boolean _readPending; + private boolean _readPending = true; ConnectorImpl(DriverImpl driver, Listener listener, SocketChannel c, C context, SelectionKey key) { @@ -81,33 +81,56 @@ class ConnectorImpl implements Connec _readPending = false; if (isClosed()) return; } + else + { + processInput(); + } write(); } } - void read() throws IOException + private void read() throws IOException { int bytesRead = 0; while ((bytesRead = _channel.read(_readBuffer)) > 0) { - _readBuffer.flip(); - int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(), _readBuffer.limit()); - _readBuffer.position(consumed == Transport.END_OF_STREAM ? _readBuffer.limit() : consumed); + processInput(); + } + if (bytesRead == -1) { + close(); + } + } + + private int processInput() throws IOException + { + _readBuffer.flip(); + int total = 0; + while (_readBuffer.hasRemaining()) + { + int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(), _readBuffer.remaining()); + if (consumed == Transport.END_OF_STREAM) + { + continue; + } + else if (consumed == 0) + { + break; + } + _readBuffer.position(_readBuffer.position() + consumed); if (_logger.isLoggable(Level.FINE)) { _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + _readBuffer.remaining() + " available"); } - _readBuffer.compact(); - } - if (bytesRead == -1) { - close(); + total += consumed; } + _readBuffer.compact(); + return total; } - void write() throws IOException + private void write() throws IOException { int interest = _key.interestOps(); - int start = _writeBuffer.position(); + boolean empty = _writeBuffer.position() == 0; boolean done = false; while (!done) { @@ -119,19 +142,20 @@ class ConnectorImpl implements Connec { _logger.log(Level.FINE, "wrote " + wrote + " bytes, " + _writeBuffer.remaining() + " remaining"); } - _writeBuffer.compact(); - if (_writeBuffer.position() > 0) + if (_writeBuffer.hasRemaining()) { //weren't able to write all available data, ask to be notfied when we can write again + _writeBuffer.compact(); interest |= SelectionKey.OP_WRITE; done = true; } else { - //we are done if buffer was empty to begin with and we did not produce enough to fill it + //we are done if buffer was empty to begin with and we did not produce anything + _writeBuffer.clear(); interest &= ~SelectionKey.OP_WRITE; - done = start == 0 && produced < _writeBuffer.capacity(); - start = 0; + done = empty && produced == 0; + empty = true; } } _key.interestOps(interest); Modified: qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original) +++ qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Fri Feb 1 11:11:29 2013 @@ -408,6 +408,11 @@ public class DeliveryImpl implements Del _complete = true; } + public boolean isPartial() + { + return !_complete; + } + void setRemoteDeliveryState(DeliveryState remoteDeliveryState) { _remoteDeliveryState = remoteDeliveryState; Modified: qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1441385&r1=1441384&r2=1441385&view=diff ============================================================================== --- qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original) +++ qpid/proton/branches/jni-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Fri Feb 1 11:11:29 2013 @@ -134,7 +134,6 @@ public class MessengerImpl implements Me try { c.process(); - c.close(); } catch (IOException e) { @@ -153,6 +152,14 @@ public class MessengerImpl implements Me _logger.log(Level.WARNING, "Error while closing listener", e); } } + try + { + waitUntil(_allClosed); + } + catch(TimeoutException e) + { + _logger.log(Level.WARNING, "Timed out while waiting for close", e); + } _driver.destroy(); } @@ -215,7 +222,7 @@ public class MessengerImpl implements Me Delivery delivery = connection.getWorkHead(); while (delivery != null) { - if (delivery.isReadable()) + if (delivery.isReadable() && !delivery.isPartial()) { _logger.log(Level.FINE, "Readable delivery found: " + delivery); int size = read((Receiver) delivery.getLink()); @@ -463,9 +470,16 @@ public class MessengerImpl implements Me { session.close(); } - if (connection.getLocalState() == EndpointState.ACTIVE && connection.getRemoteState() == EndpointState.CLOSED) + if (connection.getRemoteState() == EndpointState.CLOSED) { - connection.close(); + if (connection.getLocalState() == EndpointState.ACTIVE) + { + connection.close(); + } + else if (connection.getLocalState() == EndpointState.CLOSED) + { + c.close(); + } } if (c.isClosed()) @@ -501,7 +515,7 @@ public class MessengerImpl implements Me boolean wait = deadline > System.currentTimeMillis(); boolean first = true; - boolean done = condition.test(); + boolean done = false; while (first || (!done && wait)) { @@ -513,6 +527,10 @@ public class MessengerImpl implements Me done = done || condition.test(); first = false; } + if (!done) + { + throw new TimeoutException(); + } } private Connection lookup(String host, String service) @@ -581,7 +599,7 @@ public class MessengerImpl implements Me for (Connector c : _driver.connectors()) { Connection connection = c.getConnection(); - for (Link link : new Links(connection, ACTIVE, ACTIVE)) + for (Link link : new Links(connection, ACTIVE, ANY)) { if (link instanceof Sender) { @@ -642,9 +660,8 @@ public class MessengerImpl implements Me Delivery delivery = connection.getWorkHead(); while (delivery != null) { - if (delivery.isReadable()) + if (delivery.isReadable() && !delivery.isPartial()) { - //TODO: check for partial delivery? return true; } else @@ -657,8 +674,19 @@ public class MessengerImpl implements Me } } + private class AllClosed implements Predicate + { + public boolean test() + { + if (_driver.connectors().iterator().hasNext()) return false; + else return true; + } + + } + private final SentSettled _sentSettled = new SentSettled(); private final MessageAvailable _messageAvailable = new MessageAvailable(); + private final AllClosed _allClosed = new AllClosed(); private interface LinkFinder { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org