qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1630806 - in /qpid/proton/branches/examples/proton-j/src: main/java/org/apache/qpid/proton/engine/impl/ test/java/org/apache/qpid/proton/systemtests/
Date Fri, 10 Oct 2014 12:44:55 GMT
Author: gsim
Date: Fri Oct 10 12:44:55 2014
New Revision: 1630806

URL: http://svn.apache.org/r1630806
Log:
PROTON-685: iterate on a copy of the values to prevent CME after the child free() calls  modifies
the map

Added:
    qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
    qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java
Modified:
    qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
    qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java

Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java?rev=1630806&r1=1630805&r2=1630806&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
(original)
+++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
Fri Oct 10 12:44:55 2014
@@ -103,11 +103,14 @@ public class SessionImpl extends Endpoin
         _connection.removeSessionEndpoint(_node);
         _node = null;
 
-        for(SenderImpl sender : _senders.values()) {
+        List<SenderImpl> senders = new ArrayList<SenderImpl>(_senders.values());
+        for(SenderImpl sender : senders) {
             sender.free();
         }
         _senders.clear();
-        for(ReceiverImpl receiver : _receivers.values()) {
+
+        List<ReceiverImpl> receivers = new ArrayList<ReceiverImpl>(_receivers.values());
+        for(ReceiverImpl receiver : receivers) {
             receiver.free();
         }
         _receivers.clear();

Added: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java?rev=1630806&view=auto
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
(added)
+++ qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
Fri Oct 10 12:44:55 2014
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.systemtests;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+
+public abstract class EngineTestBase
+{
+    private static final Logger LOGGER = Logger.getLogger(EngineTestBase.class.getName());
+
+    private final TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER);
+    private final ProtonContainer _client = new ProtonContainer("clientContainer");
+    private final ProtonContainer _server = new ProtonContainer("serverContainer");
+
+    protected TestLoggingHelper getTestLoggingHelper()
+    {
+        return _testLoggingHelper;
+    }
+
+    protected ProtonContainer getClient()
+    {
+        return _client;
+    }
+
+    protected ProtonContainer getServer()
+    {
+        return _server;
+    }
+
+    protected void assertClientHasNothingToOutput()
+    {
+        assertEquals(0, getClient().transport.getOutputBuffer().remaining());
+        getClient().transport.outputConsumed();
+    }
+
+    protected void pumpServerToClient()
+    {
+        ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
+
+        getTestLoggingHelper().prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX
+ " ", serverBuffer);
+        assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
+
+        ByteBuffer clientBuffer = getClient().transport.getInputBuffer();
+
+        clientBuffer.put(serverBuffer);
+
+        assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining());
+
+        getClient().transport.processInput().checkIsOk();
+        getServer().transport.outputConsumed();
+    }
+
+    protected void pumpClientToServer()
+    {
+        ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
+
+        getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>>
", clientBuffer);
+        assertTrue("Client expected to produce some output", clientBuffer.hasRemaining());
+
+        ByteBuffer serverBuffer = getServer().transport.getInputBuffer();
+
+        serverBuffer.put(clientBuffer);
+
+        assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining());
+
+        getClient().transport.outputConsumed();
+        getServer().transport.processInput().checkIsOk();
+    }
+
+    protected void doOutputInputCycle() throws Exception
+    {
+        pumpClientToServer();
+
+        pumpServerToClient();
+    }
+
+    protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState
remoteState)
+    {
+        assertEquals(localState, endpoint.getLocalState());
+        assertEquals(remoteState, endpoint.getRemoteState());
+    }
+
+    protected void assertTerminusEquals(org.apache.qpid.proton.amqp.transport.Target expectedTarget,
org.apache.qpid.proton.amqp.transport.Target actualTarget)
+    {
+        assertEquals(
+                ((Target)expectedTarget).getAddress(),
+                ((Target)actualTarget).getAddress());
+    }
+}

Added: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java?rev=1630806&view=auto
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java
(added)
+++ qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java
Fri Oct 10 12:44:55 2014
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.systemtests;
+
+import static java.util.EnumSet.of;
+import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
+import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
+import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.junit.Test;
+
+public class FreeTest extends EngineTestBase
+{
+    private static final Logger LOGGER = Logger.getLogger(FreeTest.class.getName());
+
+    @Test
+    public void testFreeConnectionWithMultipleSessionsAndSendersAndReceiversDoesNotThrowCME()
throws Exception
+    {
+        LOGGER.fine(bold("======== About to create transports"));
+
+        getClient().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+        getServer().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
+
+        getClient().connection = Proton.connection();
+        getClient().transport.bind(getClient().connection);
+
+        getServer().connection = Proton.connection();
+        getServer().transport.bind(getServer().connection);
+
+
+
+        LOGGER.fine(bold("======== About to open connections"));
+        getClient().connection.open();
+        getServer().connection.open();
+
+        doOutputInputCycle();
+
+
+
+        LOGGER.fine(bold("======== About to open sessions"));
+        getClient().session = getClient().connection.session();
+        getClient().session.open();
+
+        Session clientSession2 = getClient().connection.session();
+        clientSession2.open();
+
+        pumpClientToServer();
+
+        getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+        getServer().session.open();
+        assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+        Session serverSession2 = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertNotNull("Engine did not return expected second server session", serverSession2);
+        assertNotSame("Engine did not return expected second server session", serverSession2,
getServer().session);
+        serverSession2.open();
+
+        pumpServerToClient();
+        assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+        assertEndpointState(clientSession2, ACTIVE, ACTIVE);
+
+
+
+        LOGGER.fine(bold("======== About to create client senders"));
+
+        getClient().source = new Source();
+        getClient().source.setAddress(null);
+
+        getClient().target = new Target();
+        getClient().target.setAddress("myQueue");
+
+        getClient().sender = getClient().session.sender("sender1");
+        getClient().sender.setTarget(getClient().target);
+        getClient().sender.setSource(getClient().source);
+
+        getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED);
+
+        getClient().sender.open();
+        assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
+
+
+        Sender clientSender2 = getClient().session.sender("sender2");
+        clientSender2.setTarget(getClient().target);
+        clientSender2.setSource(getClient().source);
+
+        clientSender2.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        clientSender2.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        assertEndpointState(clientSender2, UNINITIALIZED, UNINITIALIZED);
+
+        clientSender2.open();
+        assertEndpointState(clientSender2, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+
+        LOGGER.fine(bold("======== About to set up server receivers"));
+
+        getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED),
of(ACTIVE));
+        // Accept the settlement modes suggested by the client
+        getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
+        getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget();
+        assertTerminusEquals(getClient().target, serverRemoteTarget);
+
+        getServer().receiver.setTarget(serverRemoteTarget);
+
+        assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
+        getServer().receiver.open();
+
+        assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
+
+        Receiver serverReceiver2 = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED),
of(ACTIVE));
+        serverReceiver2.open();
+        assertEndpointState(serverReceiver2, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(getClient().sender, ACTIVE, ACTIVE);
+        assertEndpointState(clientSender2, ACTIVE, ACTIVE);
+
+
+
+        LOGGER.fine(bold("======== About to create client receivers"));
+
+        Source src = new Source();
+        src.setAddress("myQueue");
+
+        Target tgt1 = new Target();
+        tgt1.setAddress("receiver1");
+
+        getClient().receiver = getClient().session.receiver("receiver1");
+        getClient().receiver.setSource(src);
+        getClient().receiver.setTarget(tgt1);
+
+        getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+        getClient().receiver.open();
+        assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+
+        Target tgt2 = new Target();
+        tgt1.setAddress("receiver2");
+
+        Receiver clientReceiver2 = getClient().session.receiver("receiver2");
+        clientReceiver2.setSource(src);
+        clientReceiver2.setTarget(tgt2);
+
+        clientReceiver2.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        clientReceiver2.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+        assertEndpointState(clientReceiver2, UNINITIALIZED, UNINITIALIZED);
+
+        clientReceiver2.open();
+        assertEndpointState(clientReceiver2, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+
+
+        LOGGER.fine(bold("======== About to set up server senders"));
+
+        getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED),
of(ACTIVE));
+        // Accept the settlement modes suggested by the client
+        getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+        getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget2 = getServer().sender.getRemoteTarget();
+        assertTerminusEquals(tgt1, serverRemoteTarget2);
+
+        getServer().sender.setTarget(serverRemoteTarget2);
+
+        assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+        getServer().sender.open();
+        assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+        Sender serverSender2 = (Sender) getServer().connection.linkHead(of(UNINITIALIZED),
of(ACTIVE));
+
+        serverRemoteTarget2 = serverSender2.getRemoteTarget();
+        assertTerminusEquals(tgt2, serverRemoteTarget2);
+        serverSender2.setTarget(serverRemoteTarget2);
+        serverSender2.open();
+        assertEndpointState(serverSender2, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+        assertEndpointState(clientReceiver2, ACTIVE, ACTIVE);
+
+
+
+        LOGGER.fine(bold("======== About to close and free client's connection"));
+
+        getClient().connection.close();
+        getClient().connection.free();
+    }
+
+}

Modified: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java?rev=1630806&r1=1630805&r2=1630806&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
(original)
+++ qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
Fri Oct 10 12:44:55 2014
@@ -29,7 +29,6 @@ import static org.apache.qpid.proton.sys
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.logging.Logger;
 
@@ -42,8 +41,6 @@ import org.apache.qpid.proton.amqp.messa
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
@@ -64,84 +61,79 @@ import org.junit.Test;
  *
  * Does not illustrate use of the Messenger API.
  */
-public class ProtonEngineExampleTest
+public class ProtonEngineExampleTest extends EngineTestBase
 {
     private static final Logger LOGGER = Logger.getLogger(ProtonEngineExampleTest.class.getName());
 
     private static final int BUFFER_SIZE = 4096;
 
-    private TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER);
-
-    private final ProtonContainer _client = new ProtonContainer("clientContainer");
-    private final ProtonContainer _server = new ProtonContainer("serverContainer");
-
-    private final String _targetAddress = _server.containerId + "-link1-target";
+    private final String _targetAddress = getServer().containerId + "-link1-target";
 
     @Test
     public void test() throws Exception
     {
         LOGGER.fine(bold("======== About to create transports"));
 
-        _client.transport = Proton.transport();
-        ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX);
+        getClient().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
 
-        _server.transport = Proton.transport();
-        ProtocolTracerEnabler.setProtocolTracer(_server.transport, "            " + TestLoggingHelper.SERVER_PREFIX);
+        getServer().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
 
         doOutputInputCycle();
 
-        _client.connection = Proton.connection();
-        _client.transport.bind(_client.connection);
+        getClient().connection = Proton.connection();
+        getClient().transport.bind(getClient().connection);
 
-        _server.connection = Proton.connection();
-        _server.transport.bind(_server.connection);
+        getServer().connection = Proton.connection();
+        getServer().transport.bind(getServer().connection);
 
 
 
         LOGGER.fine(bold("======== About to open connections"));
-        _client.connection.open();
-        _server.connection.open();
+        getClient().connection.open();
+        getServer().connection.open();
 
         doOutputInputCycle();
 
 
 
         LOGGER.fine(bold("======== About to open sessions"));
-        _client.session = _client.connection.session();
-        _client.session.open();
+        getClient().session = getClient().connection.session();
+        getClient().session.open();
 
         pumpClientToServer();
 
-        _server.session = _server.connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
-        assertEndpointState(_server.session, UNINITIALIZED, ACTIVE);
+        getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
 
-        _server.session.open();
-        assertEndpointState(_server.session, ACTIVE, ACTIVE);
+        getServer().session.open();
+        assertEndpointState(getServer().session, ACTIVE, ACTIVE);
 
         pumpServerToClient();
-        assertEndpointState(_client.session, ACTIVE, ACTIVE);
+        assertEndpointState(getClient().session, ACTIVE, ACTIVE);
 
 
 
         LOGGER.fine(bold("======== About to create sender"));
 
-        _client.source = new Source();
-        _client.source.setAddress(null);
+        getClient().source = new Source();
+        getClient().source.setAddress(null);
 
-        _client.target = new Target();
-        _client.target.setAddress(_targetAddress);
+        getClient().target = new Target();
+        getClient().target.setAddress(_targetAddress);
 
-        _client.sender = _client.session.sender("link1");
-        _client.sender.setTarget(_client.target);
-        _client.sender.setSource(_client.source);
+        getClient().sender = getClient().session.sender("link1");
+        getClient().sender.setTarget(getClient().target);
+        getClient().sender.setSource(getClient().source);
         // Exactly once delivery semantics
-        _client.sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-        _client.sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+        getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+        getClient().sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);
 
-        assertEndpointState(_client.sender, UNINITIALIZED, UNINITIALIZED);
+        assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED);
 
-        _client.sender.open();
-        assertEndpointState(_client.sender, ACTIVE, UNINITIALIZED);
+        getClient().sender.open();
+        assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
 
         pumpClientToServer();
 
@@ -152,46 +144,46 @@ public class ProtonEngineExampleTest
         // A real application would be interested in more states than simply ACTIVE, as there
         // exists the possibility that the link could have moved to another state already
e.g. CLOSED.
         // (See pipelining).
-        _server.receiver = (Receiver) _server.connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+        getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED),
of(ACTIVE));
         // Accept the settlement modes suggested by the client
-        _server.receiver.setSenderSettleMode(_server.receiver.getRemoteSenderSettleMode());
-        _server.receiver.setReceiverSettleMode(_server.receiver.getRemoteReceiverSettleMode());
+        getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
+        getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
 
-        org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = _server.receiver.getRemoteTarget();
-        assertTerminusEquals(_client.target, serverRemoteTarget);
+        org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget();
+        assertTerminusEquals(getClient().target, serverRemoteTarget);
 
-        _server.receiver.setTarget(applicationDeriveTarget(serverRemoteTarget));
+        getServer().receiver.setTarget(applicationDeriveTarget(serverRemoteTarget));
 
-        assertEndpointState(_server.receiver, UNINITIALIZED, ACTIVE);
-        _server.receiver.open();
+        assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
+        getServer().receiver.open();
 
-        assertEndpointState(_server.receiver, ACTIVE, ACTIVE);
+        assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
 
         pumpServerToClient();
-        assertEndpointState(_client.sender, ACTIVE, ACTIVE);
+        assertEndpointState(getClient().sender, ACTIVE, ACTIVE);
 
-        _server.receiver.flow(1);
+        getServer().receiver.flow(1);
         pumpServerToClient();
 
 
         LOGGER.fine(bold("======== About to create a message and send it to the server"));
 
-        _client.message = Proton.message();
+        getClient().message = Proton.message();
         Section messageBody = new AmqpValue("Hello");
-        _client.message.setBody(messageBody);
-        _client.messageData = new byte[BUFFER_SIZE];
-        int lengthOfEncodedMessage = _client.message.encode(_client.messageData, 0, BUFFER_SIZE);
-        _testLoggingHelper.prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(_client.messageData,
lengthOfEncodedMessage));
+        getClient().message.setBody(messageBody);
+        getClient().messageData = new byte[BUFFER_SIZE];
+        int lengthOfEncodedMessage = getClient().message.encode(getClient().messageData,
0, BUFFER_SIZE);
+        getTestLoggingHelper().prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(getClient().messageData,
lengthOfEncodedMessage));
 
         byte[] deliveryTag = "delivery1".getBytes();
-        _client.delivery = _client.sender.delivery(deliveryTag);
-        int numberOfBytesAcceptedBySender = _client.sender.send(_client.messageData, 0, lengthOfEncodedMessage);
+        getClient().delivery = getClient().sender.delivery(deliveryTag);
+        int numberOfBytesAcceptedBySender = getClient().sender.send(getClient().messageData,
0, lengthOfEncodedMessage);
         assertEquals("For simplicity, assume the sender can accept all the data",
                      lengthOfEncodedMessage, numberOfBytesAcceptedBySender);
 
-        assertNull(_client.delivery.getLocalState());
+        assertNull(getClient().delivery.getLocalState());
 
-        boolean senderAdvanced = _client.sender.advance();
+        boolean senderAdvanced = getClient().sender.advance();
         assertTrue("sender has not advanced", senderAdvanced);
 
         pumpClientToServer();
@@ -199,106 +191,106 @@ public class ProtonEngineExampleTest
 
         LOGGER.fine(bold("======== About to process the message on the server"));
 
-        _server.delivery = _server.connection.getWorkHead();
+        getServer().delivery = getServer().connection.getWorkHead();
         assertEquals("The received delivery should be on our receiver",
-                _server.receiver, _server.delivery.getLink());
-        assertNull(_server.delivery.getLocalState());
-        assertNull(_server.delivery.getRemoteState());
+                getServer().receiver, getServer().delivery.getLink());
+        assertNull(getServer().delivery.getLocalState());
+        assertNull(getServer().delivery.getRemoteState());
 
-        assertFalse(_server.delivery.isPartial());
-        assertTrue(_server.delivery.isReadable());
+        assertFalse(getServer().delivery.isPartial());
+        assertTrue(getServer().delivery.isReadable());
 
-        _server.messageData = new byte[BUFFER_SIZE];
-        int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData,
0, BUFFER_SIZE);
+        getServer().messageData = new byte[BUFFER_SIZE];
+        int numberOfBytesProducedByReceiver = getServer().receiver.recv(getServer().messageData,
0, BUFFER_SIZE);
         assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver);
 
-        _server.message = Proton.message();
-        _server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver);
+        getServer().message = Proton.message();
+        getServer().message.decode(getServer().messageData, 0, numberOfBytesProducedByReceiver);
 
-        boolean messageProcessed = applicationProcessMessage(_server.message);
+        boolean messageProcessed = applicationProcessMessage(getServer().message);
         assertTrue(messageProcessed);
 
-        _server.delivery.disposition(Accepted.getInstance());
-        assertEquals(Accepted.getInstance(), _server.delivery.getLocalState());
+        getServer().delivery.disposition(Accepted.getInstance());
+        assertEquals(Accepted.getInstance(), getServer().delivery.getLocalState());
 
         pumpServerToClient();
-        assertEquals(Accepted.getInstance(), _client.delivery.getRemoteState());
+        assertEquals(Accepted.getInstance(), getClient().delivery.getRemoteState());
 
 
         LOGGER.fine(bold("======== About to accept and settle the message on the client"));
 
-        Delivery clientDelivery = _client.connection.getWorkHead();
-        assertEquals(_client.delivery, clientDelivery);
+        Delivery clientDelivery = getClient().connection.getWorkHead();
+        assertEquals(getClient().delivery, clientDelivery);
         assertTrue(clientDelivery.isUpdated());
-        assertEquals(_client.sender, clientDelivery.getLink());
+        assertEquals(getClient().sender, clientDelivery.getLink());
         clientDelivery.disposition(clientDelivery.getRemoteState());
-        assertEquals(Accepted.getInstance(), _client.delivery.getLocalState());
+        assertEquals(Accepted.getInstance(), getClient().delivery.getLocalState());
 
         clientDelivery.settle();
-        assertNull("Now we've settled, the delivery should no longer be in the work list",
_client.connection.getWorkHead());
+        assertNull("Now we've settled, the delivery should no longer be in the work list",
getClient().connection.getWorkHead());
 
         pumpClientToServer();
 
 
         LOGGER.fine(bold("======== About to settle the message on the server"));
 
-        assertEquals(Accepted.getInstance(), _server.delivery.getRemoteState());
-        Delivery serverDelivery = _server.connection.getWorkHead();
-        assertEquals(_server.delivery, serverDelivery);
+        assertEquals(Accepted.getInstance(), getServer().delivery.getRemoteState());
+        Delivery serverDelivery = getServer().connection.getWorkHead();
+        assertEquals(getServer().delivery, serverDelivery);
         assertTrue(serverDelivery.isUpdated());
         assertTrue("Client should have already settled", serverDelivery.remotelySettled());
         serverDelivery.settle();
         assertTrue(serverDelivery.isSettled());
-        assertNull("Now we've settled, the delivery should no longer be in the work list",
_server.connection.getWorkHead());
+        assertNull("Now we've settled, the delivery should no longer be in the work list",
getServer().connection.getWorkHead());
 
         // Increment the receiver's credit so its ready for another message.
         // When using proton-c, this call is required in order to generate a Flow frame
         // (proton-j sends one even without it to eagerly restore the session incoming window).
-        _server.receiver.flow(1);
+        getServer().receiver.flow(1);
         pumpServerToClient();
 
 
         LOGGER.fine(bold("======== About to close client's sender"));
 
-        _client.sender.close();
+        getClient().sender.close();
 
         pumpClientToServer();
 
 
         LOGGER.fine(bold("======== Server about to process client's link closure"));
 
-        assertSame(_server.receiver, _server.connection.linkHead(of(ACTIVE), of(CLOSED)));
-        _server.receiver.close();
+        assertSame(getServer().receiver, getServer().connection.linkHead(of(ACTIVE), of(CLOSED)));
+        getServer().receiver.close();
 
         pumpServerToClient();
 
 
         LOGGER.fine(bold("======== About to close client's session"));
 
-        _client.session.close();
+        getClient().session.close();
 
         pumpClientToServer();
 
 
         LOGGER.fine(bold("======== Server about to process client's session closure"));
 
-        assertSame(_server.session, _server.connection.sessionHead(of(ACTIVE), of(CLOSED)));
-        _server.session.close();
+        assertSame(getServer().session, getServer().connection.sessionHead(of(ACTIVE), of(CLOSED)));
+        getServer().session.close();
 
         pumpServerToClient();
 
 
         LOGGER.fine(bold("======== About to close client's connection"));
 
-        _client.connection.close();
+        getClient().connection.close();
 
         pumpClientToServer();
 
 
         LOGGER.fine(bold("======== Server about to process client's connection closure"));
 
-        assertEquals(CLOSED, _server.connection.getRemoteState());
-        _server.connection.close();
+        assertEquals(CLOSED, getServer().connection.getRemoteState());
+        getServer().connection.close();
 
         pumpServerToClient();
 
@@ -331,66 +323,4 @@ public class ProtonEngineExampleTest
         Object messageBody = ((AmqpValue)message.getBody()).getValue();
         return "Hello".equals(messageBody);
     }
-
-    private void assertTerminusEquals(
-            org.apache.qpid.proton.amqp.transport.Target expectedTarget,
-            org.apache.qpid.proton.amqp.transport.Target actualTarget)
-    {
-        assertEquals(
-                ((Target)expectedTarget).getAddress(),
-                ((Target)actualTarget).getAddress());
-    }
-
-    private void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState
remoteState)
-    {
-        assertEquals(localState, endpoint.getLocalState());
-        assertEquals(remoteState, endpoint.getRemoteState());
-    }
-
-    private void doOutputInputCycle() throws Exception
-    {
-        pumpClientToServer();
-
-        pumpServerToClient();
-    }
-
-    private void pumpClientToServer()
-    {
-        ByteBuffer clientBuffer = _client.transport.getOutputBuffer();
-
-        _testLoggingHelper.prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ",
clientBuffer);
-        assertTrue("Client expected to produce some output", clientBuffer.hasRemaining());
-
-        ByteBuffer serverBuffer = _server.transport.getInputBuffer();
-
-        serverBuffer.put(clientBuffer);
-
-        assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining());
-
-        _client.transport.outputConsumed();
-        _server.transport.processInput().checkIsOk();
-    }
-
-    private void pumpServerToClient()
-    {
-        ByteBuffer serverBuffer = _server.transport.getOutputBuffer();
-
-        _testLoggingHelper.prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX
+ " ", serverBuffer);
-        assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
-
-        ByteBuffer clientBuffer = _client.transport.getInputBuffer();
-
-        clientBuffer.put(serverBuffer);
-
-        assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining());
-
-        _client.transport.processInput().checkIsOk();
-        _server.transport.outputConsumed();
-    }
-
-    private void assertClientHasNothingToOutput()
-    {
-        assertEquals(0, _client.transport.getOutputBuffer().remaining());
-        _client.transport.outputConsumed();
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message