qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [1/3] qpid-jms git commit: QPIDJMS-45: add test peer support for empty frames, add additional testing around idle-timeout handling
Date Fri, 08 May 2015 16:23:05 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 5b3c02920 -> 07509d98a


QPIDJMS-45: add test peer support for empty frames, add additional testing around idle-timeout
handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3286fd21
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3286fd21
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3286fd21

Branch: refs/heads/master
Commit: 3286fd21321fa2c888b164292e710b962fbaf65c
Parents: 5b3c029
Author: Robert Gemmell <robbie@apache.org>
Authored: Fri May 8 17:00:29 2015 +0100
Committer: Robert Gemmell <robbie@apache.org>
Committed: Fri May 8 17:00:29 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   4 +
 .../integration/IdleTimeoutIntegrationTest.java | 180 ++++++++++++++++++-
 .../qpid/jms/test/testpeer/AmqpDataFramer.java  |   9 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  68 ++++++-
 .../qpid/jms/test/testpeer/TestFrameParser.java |   2 +-
 5 files changed, 251 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 241101f..f6c907a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -937,6 +937,10 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
         return closed.get();
     }
 
+    public boolean isFailed() {
+        return failed.get();
+    }
+
     public JmsConnectionId getConnectionId() {
         return connectionInfo.getConnectionId();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
index 984b01c..7ebd8fd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
@@ -20,21 +20,35 @@
  */
 package org.apache.qpid.jms.integration;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 
+import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matchers;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
 
+    public static final Logger LOGGER = LoggerFactory.getLogger(IdleTimeoutIntegrationTest.class);
+
     @Test(timeout = 5000)
     public void testIdleTimeoutIsAdvertisedByDefault() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -65,7 +79,71 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin(true);
 
-            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()
+ "?amqp.IdleTimeout=" + configuredTimeout);
+            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()
+ "?amqp.idleTimeout=" + configuredTimeout);
+            Connection connection = factory.createConnection();
+            // Set a clientID to provoke the actual AMQP connection process to occur.
+            connection.setClientID("clientName");
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertNull(testPeer.getThrowable());
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testClientSendsEmptyFramesWhenPeerAdvertisesIdleTimeout() throws Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            int period = 20;
+            int idleSleep =  100;
+            int advertisedTimeout = period * 2;
+
+            testPeer.setAdvertisedIdleTimeout(advertisedTimeout);
+
+            testPeer.expectAnonymousConnect(true);
+            // Each connection creates a session for managing temporary destinations etc
+            testPeer.expectBegin(true);
+
+            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
+            Connection connection = factory.createConnection();
+            // Set a clientID to provoke the actual AMQP connection process to occur.
+            connection.setClientID("clientName");
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertNull(testPeer.getThrowable());
+
+            // Sleep for a bit, let the idle handling work
+            Thread.sleep(idleSleep);
+
+            testPeer.expectClose();
+
+            connection.close();
+
+            // Verify that *any* empty frames were received by the peer.
+            // We will verify additional behaviours with slower tests.
+            assertThat(testPeer.getEmptyFrameCount(), Matchers.greaterThan(0));
+        }
+    }
+
+    //TODO: Could use JUnit categories to make this slowish test skipable?
+    //      If so, make it slower still and more granular.
+    @Test(timeout = 5000)
+    public void testClientSendsEmptyFramesWithExpectedFrequency() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            int period = 250;
+            int advertisedTimeout = period * 2;
+            int cycles = 10;
+            int idleSleep =  cycles * period;
+            int offset = 2;
+
+            testPeer.setAdvertisedIdleTimeout(advertisedTimeout);
+
+            testPeer.expectAnonymousConnect(true);
+            // Each connection creates a session for managing temporary destinations etc
+            testPeer.expectBegin(true);
+
+            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
             Connection connection = factory.createConnection();
             // Set a clientID to provoke the actual AMQP connection process to occur.
             connection.setClientID("clientName");
@@ -73,8 +151,108 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
             assertNull(testPeer.getThrowable());
 
+            // Sleep for a bit, let the idle handling work
+            Thread.sleep(idleSleep);
+
             testPeer.expectClose();
+
             connection.close();
+
+            assertThat(testPeer.getEmptyFrameCount(), Matchers.greaterThanOrEqualTo(cycles
- offset));
+            assertThat(testPeer.getEmptyFrameCount(), Matchers.lessThanOrEqualTo(cycles +
offset));
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testConnectionSetFailedWhenPeerNeglectsToSendEmptyFrames() throws Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            int configuredTimeout = 200;
+
+            testPeer.expectAnonymousConnect(true);
+            // Each connection creates a session for managing temporary destinations etc
+            testPeer.expectBegin(true);
+
+            JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" +
testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout);
+            final JmsConnection connection = (JmsConnection) factory.createConnection();
+            // Set a clientID to provoke the actual AMQP connection process to occur.
+            connection.setClientID("clientName");
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            // The peer is still connected, so it will get the close frame with error
+            testPeer.expectClose(Matchers.notNullValue());
+            assertNull(testPeer.getThrowable());
+
+            boolean failed = Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return connection.isFailed();
+                }
+            }, configuredTimeout * 2, 10);
+
+            assertTrue("connection didnt fail in expected timeframe", failed);
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testConnectionNotMarkedFailedWhenPeerSendsEmptyFrames() throws Exception
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            int configuredTimeout = 1000;
+            int period = 250;
+            int cycles = 8;
+
+            final CountDownLatch latch = new CountDownLatch(cycles);
+
+            testPeer.expectAnonymousConnect(true);
+            // Each connection creates a session for managing temporary destinations etc
+            testPeer.expectBegin(true);
+
+            // Start to emit idle frames when the connection is set up, this should stop
it timing out
+            testPeer.runAfterLastHandler(new EmptyFrameSender(latch, period, cycles, testPeer));
+
+            JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" +
testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout);
+            final JmsConnection connection = (JmsConnection) factory.createConnection();
+            // Set a clientID to provoke the actual AMQP connection process to occur.
+            connection.setClientID("clientName");
+
+            boolean framesSent = latch.await(cycles * period * 2, TimeUnit.MILLISECONDS);
+            assertTrue("idle frames were not sent as expected", framesSent);
+
+            assertFalse("connection shouldnt fail", connection.isFailed());
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertNull(testPeer.getThrowable());
+        }
+    }
+
+    private static class EmptyFrameSender implements AmqpPeerRunnable
+    {
+        private int delay;
+        private int cycles;
+        private CountDownLatch latch;
+        private TestAmqpPeer testPeer;
+
+        public EmptyFrameSender(CountDownLatch latch, int delay, int cycles, TestAmqpPeer
testPeer) {
+            this.cycles = cycles;
+            this.delay = delay;
+            this.latch = latch;
+            this.testPeer = testPeer;
+        }
+
+        @Override
+        public void run() {
+            for (int i = 0; i < cycles; i++) {
+                LOGGER.info("Delaying before empty frame: {}", i + 1);
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException e) {
+                    // pass
+                }
+                LOGGER.info("Sending empty frame: {}", i + 1);
+                testPeer.sendEmptyFrame(false);
+                latch.countDown();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
index 0fb3d42..4a4d0a8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
@@ -40,9 +40,12 @@ public class AmqpDataFramer
 
         buffer.position(8); // leave hole for frame header
 
-        Data frameBody = Proton.data(CAPACITY);
-        frameBody.putDescribedType(describedType);
-        frameBody.encode(buffer);
+        if (describedType != null) {
+            Data frameBody = Proton.data(CAPACITY);
+            frameBody.putDescribedType(describedType);
+            frameBody.encode(buffer);
+        }
+
         if(payload != null)
         {
             buffer.put(payload.asByteBuffer());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 3dd6506..c12d575 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -116,6 +116,8 @@ public class TestAmqpPeer implements AutoCloseable
     private byte[] _deferredBytes;
     private int _lastInitiatedChannel = -1;
     private UnsignedInteger _lastInitiatedLinkHandle = null;
+    private int advertisedIdleTimeout = 0;
+    private int _emptyFrameCount = 0;
 
     public TestAmqpPeer() throws IOException
     {
@@ -180,6 +182,20 @@ public class TestAmqpPeer implements AutoCloseable
         return _driverRunnable.getClientSocket();
     }
 
+    public int getAdvertisedIdleTimeout()
+    {
+        return advertisedIdleTimeout;
+    }
+
+    public void setAdvertisedIdleTimeout(int advertisedIdleTimeout)
+    {
+        this.advertisedIdleTimeout = advertisedIdleTimeout;
+    }
+
+    public int getEmptyFrameCount() {
+        return _emptyFrameCount;
+    }
+
     public void receiveHeader(byte[] header)
     {
         Handler handler = getFirstHandler();
@@ -214,6 +230,12 @@ public class TestAmqpPeer implements AutoCloseable
         }
     }
 
+    public void receiveEmptyFrame(int type, int channel)
+    {
+        _emptyFrameCount ++;
+        LOGGER.debug("Received empty frame");
+    }
+
     private void removeFirstHandler()
     {
         synchronized(_handlersLock)
@@ -279,6 +301,11 @@ public class TestAmqpPeer implements AutoCloseable
         _driverRunnable.sendBytes(header);
     }
 
+    public void sendEmptyFrame(boolean deferWrite)
+    {
+        sendFrame(FrameType.AMQP, 0, null, null, deferWrite);
+    }
+
     public void sendFrame(FrameType type, int channel, DescribedType frameDescribedType,
Binary framePayload, boolean deferWrite)
     {
         if(channel < 0)
@@ -320,6 +347,18 @@ public class TestAmqpPeer implements AutoCloseable
         }
     }
 
+    private OpenFrame createOpenFrame()
+    {
+        OpenFrame openFrame = new OpenFrame();
+        openFrame.setContainerId("test-amqp-peer-container-id");
+        if(advertisedIdleTimeout != 0)
+        {
+            openFrame.setIdleTimeOut(UnsignedInteger.valueOf(advertisedIdleTimeout));
+        }
+
+        return openFrame;
+    }
+
     public void expectAnonymousConnect(boolean authorize)
     {
         expectAnonymousConnect(authorize, null);
@@ -352,11 +391,13 @@ public class TestAmqpPeer implements AutoCloseable
 
         addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
 
+        OpenFrame openFrame = createOpenFrame();
+
         OpenMatcher openMatcher = new OpenMatcher()
             .withContainerId(notNullValue(String.class))
             .onSuccess(new FrameSender(
                     this, FrameType.AMQP, 0,
-                    new OpenFrame().setContainerId("test-amqp-peer-container-id"),
+                    openFrame,
                     null));
 
         if(idleTimeoutMatcher !=null)
@@ -397,11 +438,13 @@ public class TestAmqpPeer implements AutoCloseable
 
         addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
 
+        OpenFrame openFrame = createOpenFrame();
+
         addHandler(new OpenMatcher()
             .withContainerId(notNullValue(String.class))
             .onSuccess(new FrameSender(
                     this, FrameType.AMQP, 0,
-                    new OpenFrame().setContainerId("test-amqp-peer-container-id"),
+                    openFrame,
                     null)));
     }
 
@@ -443,8 +486,7 @@ public class TestAmqpPeer implements AutoCloseable
 
         addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
 
-        OpenFrame open = new OpenFrame();
-        open.setContainerId("test-amqp-peer-container-id");
+        OpenFrame open = createOpenFrame();
         if(serverCapabilities != null)
         {
             open.setOfferedCapabilities(serverCapabilities);
@@ -502,9 +544,8 @@ public class TestAmqpPeer implements AutoCloseable
         Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
         properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
 
-        OpenFrame open = new OpenFrame();
+        OpenFrame open = createOpenFrame();
         open.setProperties(properties);
-        open.setContainerId("test-amqp-peer-container-id");
 
         addHandler(new OpenMatcher()
             .withContainerId(notNullValue(String.class))
@@ -529,8 +570,13 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectClose()
     {
+        expectClose(Matchers.nullValue());
+    }
+
+    public void expectClose(Matcher<?> errorMatcher)
+    {
         addHandler(new CloseMatcher()
-            .withError(Matchers.nullValue())
+            .withError(errorMatcher)
             .onSuccess(new FrameSender(this, FrameType.AMQP, 0,
                     new CloseFrame(),
                     null)));
@@ -1464,4 +1510,12 @@ public class TestAmqpPeer implements AutoCloseable
             comp.add(transferSender);
         }
     }
+
+    public void runAfterLastHandler(AmqpPeerRunnable action) {
+        synchronized (_handlersLock) {
+            // Prepare a composite to insert this action at the end of the handler sequence
+            CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+            comp.add(action);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
index 893a399..63799c7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
@@ -333,7 +333,7 @@ class TestFrameParser
                         }
                         else
                         {
-                            LOGGER.debug("Ignored empty frame");
+                            _peer.receiveEmptyFrame(type, channel);
                         }
                         _size = 0;
                         currentInput = nextFramesInput;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message