Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C788B200B83 for ; Sat, 3 Sep 2016 03:44:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C5ED5160A8C; Sat, 3 Sep 2016 01:44:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96BCE160ACB for ; Sat, 3 Sep 2016 03:44:36 +0200 (CEST) Received: (qmail 92859 invoked by uid 500); 3 Sep 2016 01:44:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 92762 invoked by uid 99); 3 Sep 2016 01:44:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Sep 2016 01:44:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3592E053F; Sat, 3 Sep 2016 01:44:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Sat, 03 Sep 2016 01:44:36 -0000 Message-Id: In-Reply-To: <26dc5dcc6f584d2683e4bf314ea6b62f@git.apache.org> References: <26dc5dcc6f584d2683e4bf314ea6b62f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] activemq-artemis git commit: ARTEMIS-706 No Keep Alives from Broker archived-at: Sat, 03 Sep 2016 01:44:38 -0000 ARTEMIS-706 No Keep Alives from Broker Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/61747acf Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/61747acf Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/61747acf Branch: refs/heads/master Commit: 61747acfd12c523a89b753b07274b183c6101baa Parents: ea293fc Author: Howard Gao Authored: Fri Sep 2 12:01:03 2016 +0800 Committer: Clebert Suconic Committed: Fri Sep 2 21:39:44 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 74 ++++++++---- .../openwire/OpenWireProtocolManager.java | 46 ++++++- .../en/protocols-interoperability.md | 28 +++++ .../openwire/SimpleOpenWireTest.java | 120 +++++++++++++++++++ 4 files changed, 246 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index aeb0f2b..b66b802 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.advisory.AdvisorySupport; @@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -122,6 +124,8 @@ import org.apache.activemq.wireformat.WireFormat; */ public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { + private static final KeepAliveInfo PING = new KeepAliveInfo(); + private final OpenWireProtocolManager protocolManager; private boolean destroyed = false; @@ -167,6 +171,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se */ private ServerSession internalSession; + private volatile long lastSent = -1; + private ConnectionEntry connectionEntry; + private boolean useKeepAlive; + private long maxInactivityDuration; + // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, ActiveMQServer server, @@ -177,6 +186,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.server = server; this.protocolManager = openWireProtocolManager; this.wireFormat = wf; + this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); + this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration(); } // SecurityAuth implementation @@ -216,6 +227,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return info; } + //tells the connection that + //some bytes just sent + public void bufferSent() { + lastSent = System.currentTimeMillis(); + } + @Override public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { super.bufferReceived(connectionID, buffer); @@ -226,18 +243,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); - // TODO: the server should send packets to the client based on the requested times - - // the connection handles pings, negotiations directly. - // and delegate all other commands to manager. - if (command.getClass() == KeepAliveInfo.class) { - KeepAliveInfo info = (KeepAliveInfo) command; - info.setResponseRequired(false); - // if we don't respond to KeepAlive commands then the client will think the server is dead and timeout - // for some reason KeepAliveInfo.isResponseRequired() is always false - sendCommand(info); - } - else { + // ignore pings + if (command.getClass() != KeepAliveInfo.class) { Response response = null; try { @@ -345,16 +352,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } @Override - public boolean checkDataReceived() { - boolean res = dataReceived; - - dataReceived = false; - - return res; + public void flush() { + checkInactivity(); } - @Override - public void flush() { + private void checkInactivity() { + if (!this.useKeepAlive) { + return; + } + + long dur = System.currentTimeMillis() - lastSent; + if (dur >= this.maxInactivityDuration / 2) { + this.sendCommand(PING); + } } private void callFailureListeners(final ActiveMQException me) { @@ -390,6 +400,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se synchronized (sendLock) { getTransportConnection().write(buffer, false, false); } + bufferSent(); } catch (IOException e) { throw e; @@ -508,6 +519,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void shutdown(boolean fail) { + if (fail) { transportConnection.forceClose(); } @@ -521,6 +533,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (context == null || destroyed) { return; } + // Don't allow things to be added to the connection state while we // are shutting down. // is it necessary? even, do we need state at all? @@ -558,6 +571,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public void fail(ActiveMQException me, String message) { + if (me != null) { ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); } @@ -742,6 +756,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } + public void setConnectionEntry(ConnectionEntry connectionEntry) { + this.connectionEntry = connectionEntry; + } + + public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay, final boolean useKeepAlive) { + this.useKeepAlive = useKeepAlive; + this.maxInactivityDuration = inactivityDuration; + + protocolManager.getScheduledPool().schedule(new Runnable() { + @Override + public void run() { + if (inactivityDuration >= 0) { + connectionEntry.ttl = inactivityDuration; + } + } + }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS); + checkInactivity(); + } + class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override @@ -1025,6 +1058,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se wireFormat.renegotiateWireFormat(command); //throw back a brokerInfo here protocolManager.sendBrokerInfo(OpenWireConnection.this); + protocolManager.setUpInactivityParams(OpenWireConnection.this, command); return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index d8dd639..8497970 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.InvalidClientIDException; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -110,6 +111,11 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl private boolean updateClusterClients = false; private boolean updateClusterClientsOnRemove = false; + //http://activemq.apache.org/activemq-inactivitymonitor.html + private long maxInactivityDuration = 30 * 1000L; + private long maxInactivityDurationInitalDelay = 10 * 1000L; + private boolean useKeepAlive = true; + private final OpenWireMessageConverter messageConverter; public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { @@ -217,8 +223,11 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(), this, wf); owConn.sendHandshake(); - // TODO CLEBERT What is this constant here? we should get it from TTL initial pings - return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000); + //first we setup ttl to -1 + //then when negotiation, we handle real ttl and delay + ConnectionEntry entry = new ConnectionEntry(owConn, null, System.currentTimeMillis(), -1); + owConn.setConnectionEntry(entry); + return entry; } @Override @@ -475,6 +484,13 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl connection.dispatch(brokerInfo); } + public void setUpInactivityParams(OpenWireConnection connection, WireFormatInfo command) throws IOException { + long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration ? this.maxInactivityDuration : command.getMaxInactivityDuration(); + long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay() > this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay(); + boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive; + connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse); + } + /** * URI property */ @@ -523,4 +539,30 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl this.brokerName = name; } + public boolean isUseKeepAlive() { + return useKeepAlive; + } + + @SuppressWarnings("unused") + public void setUseKeepAlive(boolean useKeepAlive) { + this.useKeepAlive = useKeepAlive; + } + + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + + @SuppressWarnings("unused") + public long getMaxInactivityDurationInitalDelay() { + return maxInactivityDurationInitalDelay; + } + + @SuppressWarnings("unused") + public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { + this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/docs/user-manual/en/protocols-interoperability.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index 35d0550..c5b32a1 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -136,6 +136,34 @@ Currently we support Apache ActiveMQ Artemis clients that using standard JMS API the future we will get more supports for some advanced, Apache ActiveMQ Artemis specific features into Apache ActiveMQ Artemis. +### Connection Monitoring + +OpenWire has a few paramters to control how each connection is monitored, they are: + +* maxInactivityDuration: +It specifies the time (milliseconds) after which the connection is closed by the broker if no data was received. +Default value is 30000. + +* maxInactivityDurationInitalDelay: +It specifies the maximum delay (milliseconds) before inactivity monitoring is started on the connection. +It can be useful if a broker is under load with many connections being created concurrently. +Default value is 10000. + +* useInactivityMonitor: +A value of false disables the InactivityMonitor completely and connections will never time out. +By default it is enabled. On broker side you don't neet set this. Instead you can set the +connection-ttl to -1. + +* useKeepAlive: +Whether or not to send a KeepAliveInfo on an idle connection to prevent it from timing out. +Enabled by default. Disabling the keep alive will still make connections time out if no data +was received on the connection for the specified amount of time. + +Note at the beginning the InactivityMonitor negotiates the appropriate maxInactivityDuration and +maxInactivityDurationInitalDelay. The shortest duration is taken for the connection. + +More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html). + ## MQTT MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 82d8242..3c2d847 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.integration.openwire; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -30,14 +32,20 @@ import javax.jms.XAConnection; import javax.jms.XASession; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -832,6 +840,45 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + /* + * This test create a consumer on a connection to consume + * messages slowly, so the connection stay for a longer time + * than its configured TTL without any user data (messages) + * coming from broker side. It tests the working of + * KeepAlive mechanism without which the test will fail. + */ + @Test + public void testSendReceiveUsingTtl() throws Exception { + String brokerUri = "failover://tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.maxInactivityDuration=10000&wireFormat.maxInactivityDurationInitalDelay=5000"; + ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory(brokerUri); + + Connection sendConnection = testFactory.createConnection(); + System.out.println("created send connection: " + sendConnection); + Connection receiveConnection = testFactory.createConnection(); + System.out.println("created receive connection: " + receiveConnection); + + try { + final int nMsg = 20; + final long delay = 2L; + + AsyncConsumer consumer = new AsyncConsumer(queueName, receiveConnection, Session.CLIENT_ACKNOWLEDGE, delay, nMsg); + + Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = sendSession.createQueue(queueName); + + MessageProducer producer = sendSession.createProducer(queue); + for (int i = 0; i < nMsg; i++) { + producer.send(sendSession.createTextMessage("testXX" + i)); + } + + consumer.waitFor(nMsg * delay * 2); + } + finally { + sendConnection.close(); + receiveConnection.close(); + } + } + @Test public void testCommitCloseConsumerBefore() throws Exception { testCommitCloseConsumer(true); @@ -1080,4 +1127,77 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } + private void checkQueueEmpty(String qName) { + PostOffice po = server.getPostOffice(); + LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString("jms.queue." + qName)); + try { + //waiting for last ack to finish + Thread.sleep(1000); + } + catch (InterruptedException e) { + } + assertEquals(0L, binding.getQueue().getMessageCount()); + } + + private class AsyncConsumer { + + private List messages = new ArrayList<>(); + private CountDownLatch latch = new CountDownLatch(1); + private int nMsgs; + private String queueName; + + private MessageConsumer consumer; + + AsyncConsumer(String queueName, + Connection receiveConnection, + final int ackMode, + final long delay, + final int expectedMsgs) throws JMSException { + this.queueName = queueName; + this.nMsgs = expectedMsgs; + Session session = receiveConnection.createSession(false, ackMode); + Queue queue = session.createQueue(queueName); + consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + System.out.println("received : " + message); + + messages.add(message); + + if (messages.size() < expectedMsgs) { + //delay + try { + TimeUnit.SECONDS.sleep(delay); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + if (ackMode == Session.CLIENT_ACKNOWLEDGE) { + try { + message.acknowledge(); + } + catch (JMSException e) { + System.err.println("Failed to acknowledge " + message); + e.printStackTrace(); + } + } + if (messages.size() == expectedMsgs) { + latch.countDown(); + } + } + }); + receiveConnection.start(); + } + + public void waitFor(long timeout) throws TimeoutException, InterruptedException, JMSException { + boolean result = latch.await(timeout, TimeUnit.SECONDS); + assertTrue(result); + //check queue empty + checkQueueEmpty(queueName); + //then check messages still the size and no dup. + assertEquals(nMsgs, messages.size()); + } + } }