activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6638
Date Wed, 05 Apr 2017 20:20:37 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 548403ad9 -> 0752d840b


https://issues.apache.org/jira/browse/AMQ-6638

Adds some additional logging to the connection validation code, adds
some additional tests as well. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0752d840
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0752d840
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0752d840

Branch: refs/heads/master
Commit: 0752d840b90a5acdb04410362fb16c943f09dc2f
Parents: 548403a
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Apr 5 16:20:28 2017 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Apr 5 16:20:28 2017 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/AmqpWireFormat.java |   8 +
 .../transport/amqp/AmqpTestSupport.java         |  16 +-
 .../transport/amqp/client/AmqpMessage.java      |  78 +++++-
 .../amqp/interop/AmqpExpiredMessageTest.java    | 269 +++++++++++++++++++
 4 files changed, 360 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 2b81ec7..89facbe 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -32,9 +32,13 @@ import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AmqpWireFormat implements WireFormat {
 
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpWireFormat.class);
+
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
     public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
     public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
@@ -137,18 +141,22 @@ public class AmqpWireFormat implements WireFormat {
      */
     public boolean isHeaderValid(AmqpHeader header, boolean authenticated) {
         if (!header.hasValidPrefix()) {
+            LOG.trace("AMQP Header arrived with invalid prefix: {}", header);
             return false;
         }
 
         if (!(header.getProtocolId() == 0 || header.getProtocolId() == SASL_PROTOCOL)) {
+            LOG.trace("AMQP Header arrived with invalid protocol ID: {}", header);
             return false;
         }
 
         if (!authenticated && !isAllowNonSaslConnections() && header.getProtocolId()
!= SASL_PROTOCOL) {
+            LOG.trace("AMQP Header arrived without SASL and server requires SASL: {}", header);
             return false;
         }
 
         if (header.getMajor() != 1 || header.getMinor() != 0 || header.getRevision() != 0)
{
+            LOG.trace("AMQP Header arrived invalid version: {}", header);
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index fd4accb..86402dc 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -72,7 +72,7 @@ public class AmqpTestSupport {
     protected ExecutorService testService = Executors.newSingleThreadExecutor();
 
     protected BrokerService brokerService;
-    protected Vector<Throwable> exceptions = new Vector<Throwable>();
+    protected Vector<Throwable> exceptions = new Vector<>();
     protected int numberOfMessages;
 
     protected URI amqpURI;
@@ -150,7 +150,7 @@ public class AmqpTestSupport {
         System.setProperty("javax.net.ssl.keyStorePassword", "password");
         System.setProperty("javax.net.ssl.keyStoreType", "jks");
 
-        ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
+        ArrayList<BrokerPlugin> plugins = new ArrayList<>();
 
         addAdditionalPlugins(plugins);
 
@@ -182,28 +182,28 @@ public class AmqpTestSupport {
         }
         if (isUseTcpConnector()) {
             connector = brokerService.addConnector(
-                "amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer()
+ getAdditionalConfig());
+                "amqp://0.0.0.0:" + amqpPort + "?transport.tcpNoDelay=true&transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
             amqpPort = connector.getConnectUri().getPort();
             amqpURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp port " + amqpPort);
         }
         if (isUseSslConnector()) {
             connector = brokerService.addConnector(
-                "amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer()
+ getAdditionalConfig());
+                "amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.tcpNoDelay=true&transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
             amqpSslPort = connector.getConnectUri().getPort();
             amqpSslURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp+ssl port " + amqpSslPort);
         }
         if (isUseNioConnector()) {
             connector = brokerService.addConnector(
-                "amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer()
+ getAdditionalConfig());
+                "amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.tcpNoDelay=true&transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
             amqpNioPort = connector.getConnectUri().getPort();
             amqpNioURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp+nio port " + amqpNioPort);
         }
         if (isUseNioPlusSslConnector()) {
             connector = brokerService.addConnector(
-                "amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
+                "amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.tcpNoDelay=true&transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
             amqpNioPlusSslPort = connector.getConnectUri().getPort();
             amqpNioPlusSslURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
@@ -238,14 +238,14 @@ public class AmqpTestSupport {
         }
         if (isUseWsConnector()) {
             connector = brokerService.addConnector(
-                "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" +
getAmqpTransformer() + getAdditionalConfig());
+                "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.tcpNoDelay=true&transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
             amqpWsPort = connector.getConnectUri().getPort();
             amqpWsURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp+ws port " + amqpWsPort);
         }
         if (isUseWssConnector()) {
             connector = brokerService.addConnector(
-                "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
+                "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.tcpNoDelay=true&transport.transformer="
+ getAmqpTransformer() + getAdditionalConfig());
             amqpWssPort = connector.getConnectUri().getPort();
             amqpWssURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp+wss port " + amqpWssPort);

http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index d28ac8e..da0c4e6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -433,6 +433,78 @@ public class AmqpMessage {
     }
 
     /**
+     * Sets the priority header on the outgoing message.
+     *
+     * @param priority the priority value to set.
+     */
+    public void setPriority(short priority) {
+       checkReadOnly();
+       lazyCreateHeader();
+       getWrappedMessage().setPriority(priority);
+    }
+
+    /**
+     * Gets the priority header on the message.
+     */
+    public short getPriority() {
+       return getWrappedMessage().getPriority();
+    }
+
+    /**
+     * Sets the ttl header on the outgoing message.
+     *
+     * @param timeToLive the ttl value to set.
+     */
+    public void setTimeToLive(long timeToLive) {
+       checkReadOnly();
+       lazyCreateHeader();
+       getWrappedMessage().setTtl(timeToLive);
+    }
+
+    /**
+     * Sets the ttl header on the outgoing message.
+     */
+    public long getTimeToLive() {
+       return getWrappedMessage().getTtl();
+    }
+
+    /**
+     * Sets the absolute expiration time property on the message.
+     *
+     * @param absoluteExpiryTime the expiration time value to set.
+     */
+    public void setAbsoluteExpiryTime(long absoluteExpiryTime) {
+       checkReadOnly();
+       lazyCreateProperties();
+       getWrappedMessage().setExpiryTime(absoluteExpiryTime);
+    }
+
+    /**
+     * Gets the absolute expiration time property on the message.
+     */
+    public long getAbsoluteExpiryTime() {
+       return getWrappedMessage().getExpiryTime();
+    }
+
+    /**
+     * Sets the creation time property on the message.
+     *
+     * @param creationTime the time value to set.
+     */
+    public void setCreationTime(long creationTime) {
+       checkReadOnly();
+       lazyCreateProperties();
+       getWrappedMessage().setCreationTime(creationTime);
+    }
+
+    /**
+     * Gets the absolute expiration time property on the message.
+     */
+    public long getCreationTime() {
+       return getWrappedMessage().getCreationTime();
+    }
+
+    /**
      * Sets a given application property on an outbound message.
      *
      * @param key
@@ -615,21 +687,21 @@ public class AmqpMessage {
 
     private void lazyCreateMessageAnnotations() {
         if (messageAnnotationsMap == null) {
-            messageAnnotationsMap = new HashMap<Symbol,Object>();
+            messageAnnotationsMap = new HashMap<>();
             message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
         }
     }
 
     private void lazyCreateDeliveryAnnotations() {
         if (deliveryAnnotationsMap == null) {
-            deliveryAnnotationsMap = new HashMap<Symbol,Object>();
+            deliveryAnnotationsMap = new HashMap<>();
             message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
         }
     }
 
     private void lazyCreateApplicationProperties() {
         if (applicationPropertiesMap == null) {
-            applicationPropertiesMap = new HashMap<String, Object>();
+            applicationPropertiesMap = new HashMap<>();
             message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java
new file mode 100644
index 0000000..902ca55
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      // Broker doesn't track messages that arrived already expired.
+      assertEquals(0, queueView.getQueueSize());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
+      assertNull(received);
+
+      // Broker doesn't track messages that arrived already expired.
+      assertEquals(0, queueView.getExpiredCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      assertEquals(1, queueView.getQueueSize());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+
+      assertEquals(0, queueView.getExpiredCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception
{
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
+      // AET should override any TTL set
+      message.setTimeToLive(60000);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      // Broker doesn't track messages that arrived already expired.
+      assertEquals(0, queueView.getQueueSize());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
+      assertNull(received);
+
+      // Broker doesn't track messages that arrived already expired.
+      assertEquals(0, queueView.getExpiredCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageThatIsExpiredUsingTTLWhenAbsoluteIsZero() throws Exception
{
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAbsoluteExpiryTime(0);
+      // AET should override any TTL set unless it is zero
+      message.setTimeToLive(1000);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      assertEquals(1, queueView.getQueueSize());
+
+      Thread.sleep(1000);
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
+      assertNull(received);
+
+      assertEquals(1, queueView.getExpiredCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception
{
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
+      // AET should override any TTL set
+      message.setTimeToLive(10);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      Thread.sleep(50);
+
+      assertEquals(1, queueView.getQueueSize());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+
+      assertEquals(0, queueView.getExpiredCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setTimeToLive(5000);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      assertEquals(1, queueView.getQueueSize());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+
+      assertEquals(0, queueView.getExpiredCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = trackConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+
+      // Get the Queue View early to avoid racing the delivery.
+      final QueueViewMBean queueView = getProxyToQueue(getTestName());
+      assertNotNull(queueView);
+
+      AmqpMessage message = new AmqpMessage();
+      message.setTimeToLive(10);
+      message.setText("Test-Message");
+      sender.send(message);
+      sender.close();
+
+      Thread.sleep(50);
+
+      assertEquals(1, queueView.getQueueSize());
+
+      // Now try and get the message
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
+      assertNull(received);
+
+      assertEquals(1, queueView.getExpiredCount());
+
+      connection.close();
+   }
+}


Mime
View raw message