activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-706 No Keep Alives from Broker
Date Sat, 03 Sep 2016 01:44:36 GMT
ARTEMIS-706 No Keep Alives from Broker


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/61747acf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/61747acf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/61747acf

Branch: refs/heads/master
Commit: 61747acfd12c523a89b753b07274b183c6101baa
Parents: ea293fc
Author: Howard Gao <howard.gao@gmail.com>
Authored: Fri Sep 2 12:01:03 2016 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Sep 2 21:39:44 2016 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  74 ++++++++----
 .../openwire/OpenWireProtocolManager.java       |  46 ++++++-
 .../en/protocols-interoperability.md            |  28 +++++
 .../openwire/SimpleOpenWireTest.java            | 120 +++++++++++++++++++
 4 files changed, 246 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index aeb0f2b..b66b802 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -122,6 +124,8 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth
{
 
+   private static final KeepAliveInfo PING = new KeepAliveInfo();
+
    private final OpenWireProtocolManager protocolManager;
 
    private boolean destroyed = false;
@@ -167,6 +171,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
     */
    private ServerSession internalSession;
 
+   private volatile long lastSent = -1;
+   private ConnectionEntry connectionEntry;
+   private boolean useKeepAlive;
+   private long maxInactivityDuration;
+
    // TODO-NOW: check on why there are two connections created for every createConnection
on the client.
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
@@ -177,6 +186,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       this.server = server;
       this.protocolManager = openWireProtocolManager;
       this.wireFormat = wf;
+      this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
+      this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
    }
 
    // SecurityAuth implementation
@@ -216,6 +227,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       return info;
    }
 
+   //tells the connection that
+   //some bytes just sent
+   public void bufferSent() {
+      lastSent = System.currentTimeMillis();
+   }
+
    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
       super.bufferReceived(connectionID, buffer);
@@ -226,18 +243,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
 
-         // TODO: the server should send packets to the client based on the requested times
-
-         // the connection handles pings, negotiations directly.
-         // and delegate all other commands to manager.
-         if (command.getClass() == KeepAliveInfo.class) {
-            KeepAliveInfo info = (KeepAliveInfo) command;
-            info.setResponseRequired(false);
-            // if we don't respond to KeepAlive commands then the client will think the server
is dead and timeout
-            // for some reason KeepAliveInfo.isResponseRequired() is always false
-            sendCommand(info);
-         }
-         else {
+         // ignore pings
+         if (command.getClass() != KeepAliveInfo.class) {
             Response response = null;
 
             try {
@@ -345,16 +352,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    }
 
    @Override
-   public boolean checkDataReceived() {
-      boolean res = dataReceived;
-
-      dataReceived = false;
-
-      return res;
+   public void flush() {
+      checkInactivity();
    }
 
-   @Override
-   public void flush() {
+   private void checkInactivity() {
+      if (!this.useKeepAlive) {
+         return;
+      }
+
+      long dur = System.currentTimeMillis() - lastSent;
+      if (dur >= this.maxInactivityDuration / 2) {
+         this.sendCommand(PING);
+      }
    }
 
    private void callFailureListeners(final ActiveMQException me) {
@@ -390,6 +400,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
          synchronized (sendLock) {
             getTransportConnection().write(buffer, false, false);
          }
+         bufferSent();
       }
       catch (IOException e) {
          throw e;
@@ -508,6 +519,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    }
 
    private void shutdown(boolean fail) {
+
       if (fail) {
          transportConnection.forceClose();
       }
@@ -521,6 +533,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       if (context == null || destroyed) {
          return;
       }
+
       // Don't allow things to be added to the connection state while we
       // are shutting down.
       // is it necessary? even, do we need state at all?
@@ -558,6 +571,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
    @Override
    public void fail(ActiveMQException me, String message) {
+
       if (me != null) {
          ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
       }
@@ -742,6 +756,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       }
    }
 
+   public void setConnectionEntry(ConnectionEntry connectionEntry) {
+      this.connectionEntry = connectionEntry;
+   }
+
+   public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay,
final boolean useKeepAlive) {
+      this.useKeepAlive = useKeepAlive;
+      this.maxInactivityDuration = inactivityDuration;
+
+      protocolManager.getScheduledPool().schedule(new Runnable() {
+         @Override
+         public void run() {
+            if (inactivityDuration >= 0) {
+               connectionEntry.ttl = inactivityDuration;
+            }
+         }
+      }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
+      checkInactivity();
+   }
+
    class SlowConsumerDetection implements SlowConsumerDetectionListener {
 
       @Override
@@ -1025,6 +1058,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
          wireFormat.renegotiateWireFormat(command);
          //throw back a brokerInfo here
          protocolManager.sendBrokerInfo(OpenWireConnection.this);
+         protocolManager.setUpInactivityParams(OpenWireConnection.this, command);
          return null;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index d8dd639..8497970 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -110,6 +111,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
    private boolean updateClusterClients = false;
    private boolean updateClusterClientsOnRemove = false;
 
+   //http://activemq.apache.org/activemq-inactivitymonitor.html
+   private long maxInactivityDuration = 30 * 1000L;
+   private long maxInactivityDurationInitalDelay = 10 * 1000L;
+   private boolean useKeepAlive = true;
+
    private final OpenWireMessageConverter messageConverter;
 
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer
server) {
@@ -217,8 +223,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
       OpenWireConnection owConn = new OpenWireConnection(connection, server, server.getExecutorFactory().getExecutor(),
this, wf);
       owConn.sendHandshake();
 
-      // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
-      return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
+      //first we setup ttl to -1
+      //then when negotiation, we handle real ttl and delay
+      ConnectionEntry entry = new ConnectionEntry(owConn, null, System.currentTimeMillis(),
-1);
+      owConn.setConnectionEntry(entry);
+      return entry;
    }
 
    @Override
@@ -475,6 +484,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
       connection.dispatch(brokerInfo);
    }
 
+   public void setUpInactivityParams(OpenWireConnection connection, WireFormatInfo command)
throws IOException {
+      long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration
? this.maxInactivityDuration : command.getMaxInactivityDuration();
+      long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay()
> this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay();
+      boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive;
+      connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse);
+   }
+
    /**
     * URI property
     */
@@ -523,4 +539,30 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
       this.brokerName = name;
    }
 
+   public boolean isUseKeepAlive() {
+      return useKeepAlive;
+   }
+
+   @SuppressWarnings("unused")
+   public void setUseKeepAlive(boolean useKeepAlive) {
+      this.useKeepAlive = useKeepAlive;
+   }
+
+   public long getMaxInactivityDuration() {
+      return maxInactivityDuration;
+   }
+
+   public void setMaxInactivityDuration(long maxInactivityDuration) {
+      this.maxInactivityDuration = maxInactivityDuration;
+   }
+
+   @SuppressWarnings("unused")
+   public long getMaxInactivityDurationInitalDelay() {
+      return maxInactivityDurationInitalDelay;
+   }
+
+   @SuppressWarnings("unused")
+   public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay)
{
+      this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/docs/user-manual/en/protocols-interoperability.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md
index 35d0550..c5b32a1 100644
--- a/docs/user-manual/en/protocols-interoperability.md
+++ b/docs/user-manual/en/protocols-interoperability.md
@@ -136,6 +136,34 @@ Currently we support Apache ActiveMQ Artemis clients that using standard
JMS API
 the future we will get more supports for some advanced, Apache ActiveMQ Artemis
 specific features into Apache ActiveMQ Artemis.
 
+### Connection Monitoring
+
+OpenWire has a few paramters to control how each connection is monitored, they are:
+
+* maxInactivityDuration:
+It specifies the time (milliseconds) after which the connection is closed by the broker if
no data was received.
+Default value is 30000.
+
+* maxInactivityDurationInitalDelay:
+It specifies the maximum delay (milliseconds) before inactivity monitoring is started on
the connection. 
+It can be useful if a broker is under load with many connections being created concurrently.
+Default value is 10000.
+
+* useInactivityMonitor:
+A value of false disables the InactivityMonitor completely and connections will never time
out. 
+By default it is enabled. On broker side you don't neet set this. Instead you can set the

+connection-ttl to -1.
+
+* useKeepAlive:
+Whether or not to send a KeepAliveInfo on an idle connection to prevent it from timing out.

+Enabled by default. Disabling the keep alive will still make connections time out if no data

+was received on the connection for the specified amount of time.
+
+Note at the beginning the InactivityMonitor negotiates the appropriate maxInactivityDuration
and 
+maxInactivityDurationInitalDelay. The shortest duration is taken for the connection.
+
+More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html).
+
 ## MQTT
 
 MQTT is a light weight, client to server, publish / subscribe messaging protocol.  MQTT has
been specifically

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61747acf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 82d8242..3c2d847 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.integration.openwire;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -30,14 +32,20 @@ import javax.jms.XAConnection;
 import javax.jms.XASession;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -832,6 +840,45 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
       }
    }
 
+   /*
+    * This test create a consumer on a connection to consume
+    * messages slowly, so the connection stay for a longer time
+    * than its configured TTL without any user data (messages)
+    * coming from broker side. It tests the working of
+    * KeepAlive mechanism without which the test will fail.
+    */
+   @Test
+   public void testSendReceiveUsingTtl() throws Exception {
+      String brokerUri = "failover://tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.maxInactivityDuration=10000&wireFormat.maxInactivityDurationInitalDelay=5000";
+      ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory(brokerUri);
+
+      Connection sendConnection = testFactory.createConnection();
+      System.out.println("created send connection: " + sendConnection);
+      Connection receiveConnection = testFactory.createConnection();
+      System.out.println("created receive connection: " + receiveConnection);
+
+      try {
+         final int nMsg = 20;
+         final long delay = 2L;
+
+         AsyncConsumer consumer = new AsyncConsumer(queueName, receiveConnection, Session.CLIENT_ACKNOWLEDGE,
delay, nMsg);
+
+         Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = sendSession.createQueue(queueName);
+
+         MessageProducer producer = sendSession.createProducer(queue);
+         for (int i = 0; i < nMsg; i++) {
+            producer.send(sendSession.createTextMessage("testXX" + i));
+         }
+
+         consumer.waitFor(nMsg * delay * 2);
+      }
+      finally {
+         sendConnection.close();
+         receiveConnection.close();
+      }
+   }
+
    @Test
    public void testCommitCloseConsumerBefore() throws Exception {
       testCommitCloseConsumer(true);
@@ -1080,4 +1127,77 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
 
    }
 
+   private void checkQueueEmpty(String qName) {
+      PostOffice po = server.getPostOffice();
+      LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString("jms.queue."
+ qName));
+      try {
+         //waiting for last ack to finish
+         Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+      }
+      assertEquals(0L, binding.getQueue().getMessageCount());
+   }
+
+   private class AsyncConsumer {
+
+      private List<Message> messages = new ArrayList<>();
+      private CountDownLatch latch = new CountDownLatch(1);
+      private int nMsgs;
+      private String queueName;
+
+      private MessageConsumer consumer;
+
+      AsyncConsumer(String queueName,
+                           Connection receiveConnection,
+                           final int ackMode,
+                           final long delay,
+                           final int expectedMsgs) throws JMSException {
+         this.queueName = queueName;
+         this.nMsgs = expectedMsgs;
+         Session session = receiveConnection.createSession(false, ackMode);
+         Queue queue = session.createQueue(queueName);
+         consumer = session.createConsumer(queue);
+         consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+               System.out.println("received : " + message);
+
+               messages.add(message);
+
+               if (messages.size() < expectedMsgs) {
+                  //delay
+                  try {
+                     TimeUnit.SECONDS.sleep(delay);
+                  }
+                  catch (InterruptedException e) {
+                     e.printStackTrace();
+                  }
+               }
+               if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
+                  try {
+                     message.acknowledge();
+                  }
+                  catch (JMSException e) {
+                     System.err.println("Failed to acknowledge " + message);
+                     e.printStackTrace();
+                  }
+               }
+               if (messages.size() == expectedMsgs) {
+                  latch.countDown();
+               }
+            }
+         });
+         receiveConnection.start();
+      }
+
+      public void waitFor(long timeout) throws TimeoutException, InterruptedException, JMSException
{
+         boolean result = latch.await(timeout, TimeUnit.SECONDS);
+         assertTrue(result);
+         //check queue empty
+         checkQueueEmpty(queueName);
+         //then check messages still the size and no dup.
+         assertEquals(nMsgs, messages.size());
+      }
+   }
 }


Mime
View raw message