qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [1/2] qpid-proton git commit: PROTON-1100: also protect against an NPE that occurs if sender link has messages on it before the Open frame is sent
Date Tue, 26 Jan 2016 14:19:00 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/0.12.x 8c6c7c531 -> 271b36363


PROTON-1100: also protect against an NPE that occurs if sender link has messages on it before
the Open frame is sent

(cherry picked from commit 6422e2497b62b46db9e993059bc514a53a8ed643)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/193bcebd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/193bcebd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/193bcebd

Branch: refs/heads/0.12.x
Commit: 193bcebd72e0d7f6107732db7dbb2bc4704345f1
Parents: 8c6c7c5
Author: Robert Gemmell <robbie@apache.org>
Authored: Tue Jan 26 13:30:14 2016 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Tue Jan 26 14:16:22 2016 +0000

----------------------------------------------------------------------
 .../qpid/proton/engine/impl/TransportImpl.java  |   2 +-
 .../proton/engine/impl/TransportImplTest.java   | 108 ++++++++++++++++++-
 2 files changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193bcebd/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 93335b0..d85794f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -489,7 +489,7 @@ public class TransportImpl extends EndpointImpl
 
     private void processTransportWork()
     {
-        if(_connectionEndpoint != null)
+        if(_connectionEndpoint != null && _isOpenSent)
         {
             DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
             while(delivery != null)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193bcebd/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index a2e2c78..888f4af 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -29,22 +29,32 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Flow;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Role;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.message.Message;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -58,6 +68,8 @@ public class TransportImplTest
     private static final TransportFrame TRANSPORT_FRAME_BEGIN = new TransportFrame(CHANNEL_ID,
new Begin(), null);
     private static final TransportFrame TRANSPORT_FRAME_OPEN = new TransportFrame(CHANNEL_ID,
new Open(), null);
 
+    private static final int BUFFER_SIZE = 4096;
+
     @Rule
     public ExpectedException _expectedException = ExpectedException.none();
 
@@ -457,7 +469,7 @@ public class TransportImplTest
      * be pipelined together.
      */
     @Test
-    public void testReceiverFlowWithoutOpen()
+    public void testReceiverFlowBeforeOpenConnection()
     {
         MockTransportImpl transport = new MockTransportImpl();
         Connection connection = Proton.connection();
@@ -485,6 +497,76 @@ public class TransportImplTest
         assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
     }
 
+    @Test
+    public void testSenderSendBeforeOpenConnection()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "mySender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        sendMessage(sender, "tag1", "content1");
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0,
transport.writes.size());
+
+        // Now open the connection, expect the Open and Begin and Attach frames but
+        // nothing else as we the sender wont have credit yet.
+        connection.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3,
transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        // Send the necessary responses to open/begin/attach then give sender credit
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(10));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3,
transport.writes.size());
+
+        // Now pump the transport again and expect a transfer for the message
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4,
transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+    }
+
     private void pumpMockTransport(MockTransportImpl transport)
     {
         while(transport.pending() > 0)
@@ -507,4 +589,28 @@ public class TransportImplTest
             return result;
         }
     }
+
+    private Delivery sendMessage(Sender sender, String deliveryTag, String messageContent)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Message m = Message.Factory.create();
+        m.setBody(new AmqpValue(messageContent));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Delivery delivery = sender.delivery(tag);
+
+        int sent = sender.send(encoded, 0, len);
+
+        assertEquals("sender unable to send all data at once as assumed for simplicity",
len, sent);
+
+        boolean senderAdvanced = sender.advance();
+        assertTrue("sender has not advanced", senderAdvanced);
+
+        return delivery;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message