Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7F8B017ACD for ; Fri, 8 May 2015 16:23:06 +0000 (UTC) Received: (qmail 31406 invoked by uid 500); 8 May 2015 16:23:06 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 31373 invoked by uid 500); 8 May 2015 16:23:06 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 31355 invoked by uid 99); 8 May 2015 16:23:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 16:23:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F692E35CD; Fri, 8 May 2015 16:23:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robbie@apache.org To: commits@qpid.apache.org Date: Fri, 08 May 2015 16:23:05 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] qpid-jms git commit: QPIDJMS-45: add test peer support for empty frames, add additional testing around idle-timeout handling Repository: qpid-jms Updated Branches: refs/heads/master 5b3c02920 -> 07509d98a QPIDJMS-45: add test peer support for empty frames, add additional testing around idle-timeout handling Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3286fd21 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3286fd21 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3286fd21 Branch: refs/heads/master Commit: 3286fd21321fa2c888b164292e710b962fbaf65c Parents: 5b3c029 Author: Robert Gemmell Authored: Fri May 8 17:00:29 2015 +0100 Committer: Robert Gemmell Committed: Fri May 8 17:00:29 2015 +0100 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 4 + .../integration/IdleTimeoutIntegrationTest.java | 180 ++++++++++++++++++- .../qpid/jms/test/testpeer/AmqpDataFramer.java | 9 +- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 68 ++++++- .../qpid/jms/test/testpeer/TestFrameParser.java | 2 +- 5 files changed, 251 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 241101f..f6c907a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -937,6 +937,10 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti return closed.get(); } + public boolean isFailed() { + return failed.get(); + } + public JmsConnectionId getConnectionId() { return connectionInfo.getConnectionId(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java index 984b01c..7ebd8fd 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java @@ -20,21 +20,35 @@ */ package org.apache.qpid.jms.integration; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.Wait; +import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.hamcrest.Matchers; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { + public static final Logger LOGGER = LoggerFactory.getLogger(IdleTimeoutIntegrationTest.class); + @Test(timeout = 5000) public void testIdleTimeoutIsAdvertisedByDefault() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -65,7 +79,71 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { // Each connection creates a session for managing temporary destinations etc testPeer.expectBegin(true); - ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.IdleTimeout=" + configuredTimeout); + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout); + Connection connection = factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + testPeer.waitForAllHandlersToComplete(1000); + assertNull(testPeer.getThrowable()); + + testPeer.expectClose(); + connection.close(); + } + } + + @Test(timeout = 5000) + public void testClientSendsEmptyFramesWhenPeerAdvertisesIdleTimeout() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + int period = 20; + int idleSleep = 100; + int advertisedTimeout = period * 2; + + testPeer.setAdvertisedIdleTimeout(advertisedTimeout); + + testPeer.expectAnonymousConnect(true); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); + Connection connection = factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + testPeer.waitForAllHandlersToComplete(1000); + assertNull(testPeer.getThrowable()); + + // Sleep for a bit, let the idle handling work + Thread.sleep(idleSleep); + + testPeer.expectClose(); + + connection.close(); + + // Verify that *any* empty frames were received by the peer. + // We will verify additional behaviours with slower tests. + assertThat(testPeer.getEmptyFrameCount(), Matchers.greaterThan(0)); + } + } + + //TODO: Could use JUnit categories to make this slowish test skipable? + // If so, make it slower still and more granular. + @Test(timeout = 5000) + public void testClientSendsEmptyFramesWithExpectedFrequency() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + int period = 250; + int advertisedTimeout = period * 2; + int cycles = 10; + int idleSleep = cycles * period; + int offset = 2; + + testPeer.setAdvertisedIdleTimeout(advertisedTimeout); + + testPeer.expectAnonymousConnect(true); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); Connection connection = factory.createConnection(); // Set a clientID to provoke the actual AMQP connection process to occur. connection.setClientID("clientName"); @@ -73,8 +151,108 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); assertNull(testPeer.getThrowable()); + // Sleep for a bit, let the idle handling work + Thread.sleep(idleSleep); + testPeer.expectClose(); + connection.close(); + + assertThat(testPeer.getEmptyFrameCount(), Matchers.greaterThanOrEqualTo(cycles - offset)); + assertThat(testPeer.getEmptyFrameCount(), Matchers.lessThanOrEqualTo(cycles + offset)); + } + } + + @Test(timeout = 5000) + public void testConnectionSetFailedWhenPeerNeglectsToSendEmptyFrames() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + int configuredTimeout = 200; + + testPeer.expectAnonymousConnect(true); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout); + final JmsConnection connection = (JmsConnection) factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + testPeer.waitForAllHandlersToComplete(1000); + // The peer is still connected, so it will get the close frame with error + testPeer.expectClose(Matchers.notNullValue()); + assertNull(testPeer.getThrowable()); + + boolean failed = Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connection.isFailed(); + } + }, configuredTimeout * 2, 10); + + assertTrue("connection didnt fail in expected timeframe", failed); + } + } + + @Test(timeout = 5000) + public void testConnectionNotMarkedFailedWhenPeerSendsEmptyFrames() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + int configuredTimeout = 1000; + int period = 250; + int cycles = 8; + + final CountDownLatch latch = new CountDownLatch(cycles); + + testPeer.expectAnonymousConnect(true); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + // Start to emit idle frames when the connection is set up, this should stop it timing out + testPeer.runAfterLastHandler(new EmptyFrameSender(latch, period, cycles, testPeer)); + + JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.idleTimeout=" + configuredTimeout); + final JmsConnection connection = (JmsConnection) factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + boolean framesSent = latch.await(cycles * period * 2, TimeUnit.MILLISECONDS); + assertTrue("idle frames were not sent as expected", framesSent); + + assertFalse("connection shouldnt fail", connection.isFailed()); + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + assertNull(testPeer.getThrowable()); + } + } + + private static class EmptyFrameSender implements AmqpPeerRunnable + { + private int delay; + private int cycles; + private CountDownLatch latch; + private TestAmqpPeer testPeer; + + public EmptyFrameSender(CountDownLatch latch, int delay, int cycles, TestAmqpPeer testPeer) { + this.cycles = cycles; + this.delay = delay; + this.latch = latch; + this.testPeer = testPeer; + } + + @Override + public void run() { + for (int i = 0; i < cycles; i++) { + LOGGER.info("Delaying before empty frame: {}", i + 1); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // pass + } + LOGGER.info("Sending empty frame: {}", i + 1); + testPeer.sendEmptyFrame(false); + latch.countDown(); + } } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java index 0fb3d42..4a4d0a8 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java @@ -40,9 +40,12 @@ public class AmqpDataFramer buffer.position(8); // leave hole for frame header - Data frameBody = Proton.data(CAPACITY); - frameBody.putDescribedType(describedType); - frameBody.encode(buffer); + if (describedType != null) { + Data frameBody = Proton.data(CAPACITY); + frameBody.putDescribedType(describedType); + frameBody.encode(buffer); + } + if(payload != null) { buffer.put(payload.asByteBuffer()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 3dd6506..c12d575 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -116,6 +116,8 @@ public class TestAmqpPeer implements AutoCloseable private byte[] _deferredBytes; private int _lastInitiatedChannel = -1; private UnsignedInteger _lastInitiatedLinkHandle = null; + private int advertisedIdleTimeout = 0; + private int _emptyFrameCount = 0; public TestAmqpPeer() throws IOException { @@ -180,6 +182,20 @@ public class TestAmqpPeer implements AutoCloseable return _driverRunnable.getClientSocket(); } + public int getAdvertisedIdleTimeout() + { + return advertisedIdleTimeout; + } + + public void setAdvertisedIdleTimeout(int advertisedIdleTimeout) + { + this.advertisedIdleTimeout = advertisedIdleTimeout; + } + + public int getEmptyFrameCount() { + return _emptyFrameCount; + } + public void receiveHeader(byte[] header) { Handler handler = getFirstHandler(); @@ -214,6 +230,12 @@ public class TestAmqpPeer implements AutoCloseable } } + public void receiveEmptyFrame(int type, int channel) + { + _emptyFrameCount ++; + LOGGER.debug("Received empty frame"); + } + private void removeFirstHandler() { synchronized(_handlersLock) @@ -279,6 +301,11 @@ public class TestAmqpPeer implements AutoCloseable _driverRunnable.sendBytes(header); } + public void sendEmptyFrame(boolean deferWrite) + { + sendFrame(FrameType.AMQP, 0, null, null, deferWrite); + } + public void sendFrame(FrameType type, int channel, DescribedType frameDescribedType, Binary framePayload, boolean deferWrite) { if(channel < 0) @@ -320,6 +347,18 @@ public class TestAmqpPeer implements AutoCloseable } } + private OpenFrame createOpenFrame() + { + OpenFrame openFrame = new OpenFrame(); + openFrame.setContainerId("test-amqp-peer-container-id"); + if(advertisedIdleTimeout != 0) + { + openFrame.setIdleTimeOut(UnsignedInteger.valueOf(advertisedIdleTimeout)); + } + + return openFrame; + } + public void expectAnonymousConnect(boolean authorize) { expectAnonymousConnect(authorize, null); @@ -352,11 +391,13 @@ public class TestAmqpPeer implements AutoCloseable addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); + OpenFrame openFrame = createOpenFrame(); + OpenMatcher openMatcher = new OpenMatcher() .withContainerId(notNullValue(String.class)) .onSuccess(new FrameSender( this, FrameType.AMQP, 0, - new OpenFrame().setContainerId("test-amqp-peer-container-id"), + openFrame, null)); if(idleTimeoutMatcher !=null) @@ -397,11 +438,13 @@ public class TestAmqpPeer implements AutoCloseable addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); + OpenFrame openFrame = createOpenFrame(); + addHandler(new OpenMatcher() .withContainerId(notNullValue(String.class)) .onSuccess(new FrameSender( this, FrameType.AMQP, 0, - new OpenFrame().setContainerId("test-amqp-peer-container-id"), + openFrame, null))); } @@ -443,8 +486,7 @@ public class TestAmqpPeer implements AutoCloseable addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); - OpenFrame open = new OpenFrame(); - open.setContainerId("test-amqp-peer-container-id"); + OpenFrame open = createOpenFrame(); if(serverCapabilities != null) { open.setOfferedCapabilities(serverCapabilities); @@ -502,9 +544,8 @@ public class TestAmqpPeer implements AutoCloseable Map properties = new HashMap(); properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); - OpenFrame open = new OpenFrame(); + OpenFrame open = createOpenFrame(); open.setProperties(properties); - open.setContainerId("test-amqp-peer-container-id"); addHandler(new OpenMatcher() .withContainerId(notNullValue(String.class)) @@ -529,8 +570,13 @@ public class TestAmqpPeer implements AutoCloseable public void expectClose() { + expectClose(Matchers.nullValue()); + } + + public void expectClose(Matcher errorMatcher) + { addHandler(new CloseMatcher() - .withError(Matchers.nullValue()) + .withError(errorMatcher) .onSuccess(new FrameSender(this, FrameType.AMQP, 0, new CloseFrame(), null))); @@ -1464,4 +1510,12 @@ public class TestAmqpPeer implements AutoCloseable comp.add(transferSender); } } + + public void runAfterLastHandler(AmqpPeerRunnable action) { + synchronized (_handlersLock) { + // Prepare a composite to insert this action at the end of the handler sequence + CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); + comp.add(action); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3286fd21/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java index 893a399..63799c7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java @@ -333,7 +333,7 @@ class TestFrameParser } else { - LOGGER.debug("Ignored empty frame"); + _peer.receiveEmptyFrame(type, channel); } _size = 0; currentInput = nextFramesInput; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org