activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1420 enforce timeout on network client handshake
Date Tue, 24 Oct 2017 19:31:39 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3c23ce0ca -> 0e0693d11


ARTEMIS-1420 enforce timeout on network client handshake


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6faffd69
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6faffd69
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6faffd69

Branch: refs/heads/master
Commit: 6faffd690e6c15eef755d12baae88171ec4ad68b
Parents: 3c23ce0
Author: Stanislav Knot <sknot@redhat.com>
Authored: Tue Oct 3 13:57:19 2017 +0200
Committer: Justin Bertram <jbertram@apache.org>
Committed: Tue Oct 24 14:28:23 2017 -0500

----------------------------------------------------------------------
 .../remoting/impl/netty/TransportConstants.java |  5 ++
 .../artemis/core/protocol/ProtocolHandler.java  | 23 ++++++
 .../core/server/ActiveMQServerLogger.java       |  4 +
 .../integration/amqp/AmqpClientTestSupport.java |  1 -
 .../integration/amqp/AmqpSendReceiveTest.java   | 25 +++----
 tests/unit-tests/pom.xml                        | 12 +++
 .../impl/netty/NettyHandshakeTimeoutTest.java   | 79 ++++++++++++++++++++
 7 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 9041348..5d86aaa 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -259,6 +259,10 @@ public class TransportConstants {
 
    public static final int DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH = 65536;
 
+   public static final String HANDSHAKE_TIMEOUT = "handshake-timeout";
+
+   public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10;
+
    static {
       Set<String> allowableAcceptorKeys = new HashSet<>();
       allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
@@ -350,6 +354,7 @@ public class TransportConstants {
       allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
       allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT);
       allowableConnectorKeys.add(TransportConstants.USE_DEFAULT_SSL_CONTEXT_PROP_NAME);
+      allowableConnectorKeys.add(TransportConstants.HANDSHAKE_TIMEOUT);
 
       ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index dba2ed5..39d07e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
@@ -38,6 +39,7 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
 import org.apache.activemq.artemis.core.remoting.impl.netty.HttpAcceptorHandler;
 import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnable;
@@ -99,9 +101,25 @@ public class ProtocolHandler {
 
       private final boolean httpEnabled;
 
+      private ScheduledFuture timeoutFuture;
+
+      private int handshakeTimeout;
+
+
       ProtocolDecoder(boolean http, boolean httpEnabled) {
          this.http = http;
          this.httpEnabled = httpEnabled;
+         this.handshakeTimeout = ConfigurationHelper.getIntProperty(TransportConstants.HANDSHAKE_TIMEOUT,
TransportConstants.DEFAULT_HANDSHAKE_TIMEOUT, nettyAcceptor.getConfiguration());
+      }
+
+      @Override
+      public void channelActive(ChannelHandlerContext ctx) throws Exception {
+         if (handshakeTimeout > 0) {
+            timeoutFuture = scheduledThreadPool.schedule( () -> {
+               ActiveMQServerLogger.LOGGER.handshakeTimeout(handshakeTimeout);
+               ctx.channel().close();
+            }, handshakeTimeout, TimeUnit.SECONDS);
+         }
       }
 
       @Override
@@ -136,6 +154,11 @@ public class ProtocolHandler {
             return;
          }
 
+         if (handshakeTimeout > 0) {
+            timeoutFuture.cancel(true);
+            timeoutFuture = null;
+         }
+
          final int magic1 = in.getUnsignedByte(in.readerIndex());
          final int magic2 = in.getUnsignedByte(in.readerIndex() + 1);
          if (http && isHttp(magic1, magic2)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 83ff12e..743ff78 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1908,4 +1908,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224087, value = "Error announcing backup: backupServerLocator is null. {0}",
format = Message.Format.MESSAGE_FORMAT)
    void errorAnnouncingBackup(String backupManager);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224088, value = "Timeout ({0} seconds) while handshaking has occurred.",
format = Message.Format.MESSAGE_FORMAT)
+   void handshakeTimeout(int timeout);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 60b9b74..054e715 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -185,7 +185,6 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       HashMap<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
-
       HashMap<String, Object> amqpParams = new HashMap<>();
       configureAMQPAcceptorParameters(amqpParams);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 5c16dfb..467ae50 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,19 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Topic;
-
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -49,6 +36,18 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Topic;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
+
 /**
  * Test basic send and receive scenarios using only AMQP sender and receiver links.
  */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/tests/unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml
index 6de60d0..d30cfec 100644
--- a/tests/unit-tests/pom.xml
+++ b/tests/unit-tests/pom.xml
@@ -111,6 +111,18 @@
          <version>${project.version}</version>
          <scope>test</scope>
       </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-junit</artifactId>
+         <version>2.4.0-SNAPSHOT</version>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq.tests</groupId>
+         <artifactId>artemis-test-support</artifactId>
+         <version>2.4.0-SNAPSHOT</version>
+         <scope>test</scope>
+      </dependency>
 
    </dependencies>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6faffd69/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java
new file mode 100644
index 0000000..3098979
--- /dev/null
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
+import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
+import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+public class NettyHandshakeTimeoutTest extends ActiveMQTestBase {
+
+   protected ActiveMQServer server;
+   private Configuration conf;
+
+   @Test
+   public void testHandshakeTimeout() throws Exception {
+      int handshakeTimeout = 3;
+
+      setUp();
+      ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT);
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HANDSHAKE_TIMEOUT, handshakeTimeout);
+
+      conf = createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(new
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
+      server = addServer(ActiveMQServers.newActiveMQServer(conf, false));
+      server.start();
+      NettyTransport transport = NettyTransportFactory.createTransport(new URI("tcp://127.0.0.1:61616"));
+      transport.setTransportListener(new NettyTransportListener() {
+         @Override
+         public void onData(ByteBuf incoming) {
+
+         }
+
+         @Override
+         public void onTransportClosed() {
+         }
+
+         @Override
+         public void onTransportError(Throwable cause) {
+         }
+
+      });
+
+      try {
+         transport.connect();
+         assertTrue("Connection should be closed now", Wait.waitFor(() -> !transport.isConnected(),
TimeUnit.SECONDS.toMillis(handshakeTimeout + 1)));
+      } finally {
+         transport.close();
+         tearDown();
+      }
+   }
+}


Mime
View raw message