activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/2] activemq git commit: Avoid overflow errors - see KAFKA-4290
Date Fri, 27 Jul 2018 12:40:12 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x cdae25ecf -> da054951c


Avoid overflow errors - see KAFKA-4290

Add test for overflow values

Bind test sockets to free ports

(cherry picked from commit 5b1412ddfd1c83ae595bdb0543fbf92499bc3822)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2fe81168
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2fe81168
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2fe81168

Branch: refs/heads/activemq-5.15.x
Commit: 2fe81168190c8727686c0b2f6d6ea0f0ce2faec8
Parents: cdae25e
Author: giliva <giovanni.liva@aau.at>
Authored: Wed May 23 13:35:38 2018 +0200
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Jul 27 08:39:52 2018 -0400

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |   2 +-
 .../transport/vm/VMTransportFactory.java        |   2 +-
 .../activemq/ActiveMQMessageConsumer.java       |   2 +-
 .../activemq/jms/pool/ConnectionPool.java       |   4 +-
 .../activemq/jms/pool/ConnectionPoolTest.java   |  62 +++++++
 .../apache/activemq/tool/JmsConsumerClient.java |   2 +-
 .../apache/activemq/tool/JmsProducerClient.java |   8 +-
 .../DemandForwardingBridgeSupportTest.java      | 183 +++++++++++++++++++
 8 files changed, 255 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index dd7716f..394cccd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1923,7 +1923,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
                 if (info == null) {
                     long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
 
-                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
+                    while (!disposed.get() || System.currentTimeMillis() - deadline <
0) {
                         if (slot.await(1, TimeUnit.MILLISECONDS)) {
                             break;
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
index 0e4b140..56baaee 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
@@ -191,7 +191,7 @@ public class VMTransportFactory extends TransportFactory {
             broker = registry.lookup(brokerName);
             if (broker == null || waitForStart > 0) {
                 final long expiry = System.currentTimeMillis() + waitForStart;
-                while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis())
{
+                while ((broker == null || !broker.isStarted()) && System.currentTimeMillis()
- expiry < 0) {
                     long timeout = Math.max(0, expiry - System.currentTimeMillis());
                     if (broker == null) {
                         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 0bf1ade..5c7015b 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -1157,7 +1157,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                         break;
                     }
                 }
-            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
+            } while (numberNotReplayed > 0 && expiry - System.currentTimeMillis()
< 0);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index a449f9f..08f8d7f 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -233,7 +233,7 @@ public class ConnectionPool implements ExceptionListener {
             }
         }
 
-        if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout)
{
+        if (expiryTimeout > 0 && (firstUsed + expiryTimeout) - System.currentTimeMillis()
< 0) {
             hasExpired = true;
             if (referenceCount == 0) {
                 close();
@@ -243,7 +243,7 @@ public class ConnectionPool implements ExceptionListener {
 
         // Only set hasExpired here is no references, as a Connection with references is
by
         // definition not idle at this time.
-        if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis()
> lastUsed + idleTimeout) {
+        if (referenceCount == 0 && idleTimeout > 0 && (lastUsed + idleTimeout)
- System.currentTimeMillis() < 0) {
             hasExpired = true;
             close();
             expired = true;

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionPoolTest.java
b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionPoolTest.java
new file mode 100644
index 0000000..70d5de2
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionPoolTest.java
@@ -0,0 +1,62 @@
+package org.apache.activemq.jms.pool;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+import static org.junit.Assert.assertFalse;
+
+public class ConnectionPoolTest extends JmsPoolTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectionPoolTest.class);
+
+
+    private class PooledConnectionFactoryTest extends PooledConnectionFactory {
+        ConnectionPool pool = null;
+        @Override
+        protected Connection newPooledConnection(ConnectionPool connection) {
+            connection.setIdleTimeout(Integer.MAX_VALUE);
+            this.pool = connection;
+            Connection ret = super.newPooledConnection(connection);
+            ConnectionPool cp = ((PooledConnection) ret).pool;
+            cp.decrementReferenceCount();
+            // will fail if timeout does overflow
+            assertFalse(cp.expiredCheck());
+            return ret;
+        }
+
+        public ConnectionPool getPool() {
+            return pool;
+        }
+
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setSchedulerSupport(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @Test(timeout = 120000)
+    public void demo() throws JMSException, InterruptedException {
+        final PooledConnectionFactoryTest pooled = new PooledConnectionFactoryTest();
+        pooled.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost?create=false"));
+        pooled.setMaxConnections(2);
+        pooled.setExpiryTimeout(Long.MAX_VALUE);
+        pooled.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
index 3fc90a5..2ee01ab 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
@@ -84,7 +84,7 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
             LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
             long endTime = System.currentTimeMillis() + duration;
 
-            while (System.currentTimeMillis() < endTime) {
+            while (System.currentTimeMillis() - endTime < 0) {
                 getJmsConsumer().receive();
                 incThroughput();
                 sleep();

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
index 68ea5cb..eb6cd38 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
@@ -199,7 +199,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
 
                 // Send to more than one actual destination
                 if (dest.length > 1) {
-                    while (System.currentTimeMillis() < endTime) {
+                    while (System.currentTimeMillis() - endTime < 0) {
                         for (int j = 0; j < dest.length; j++) {
                             getJmsProducer().send(dest[j], getJmsTextMessage());
                             incThroughput();
@@ -209,7 +209,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
                     }
                     // Send to only one actual destination
                 } else {
-                    while (System.currentTimeMillis() < endTime) {
+                    while (System.currentTimeMillis() - endTime < 0) {
                         getJmsProducer().send(getJmsTextMessage());
                         incThroughput();
                         sleep();
@@ -224,7 +224,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
                 // Send to more than one actual destination
                 long count = 1;
                 if (dest.length > 1) {
-                    while (System.currentTimeMillis() < endTime) {
+                    while (System.currentTimeMillis() - endTime < 0) {
                         for (int j = 0; j < dest.length; j++) {
                             getJmsProducer().send(dest[j], createJmsTextMessage("Text Message
[" + count++ + "]"));
                             incThroughput();
@@ -235,7 +235,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
 
                     // Send to only one actual destination
                 } else {
-                    while (System.currentTimeMillis() < endTime) {
+                    while (System.currentTimeMillis() - endTime < 0) {
 
                         getJmsProducer().send(createJmsTextMessage("Text Message [" + count++
+ "]"));
                         incThroughput();

http://git-wip-us.apache.org/repos/asf/activemq/blob/2fe81168/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeSupportTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeSupportTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeSupportTest.java
new file mode 100644
index 0000000..761865f
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeSupportTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.Test;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.*;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+
+public class DemandForwardingBridgeSupportTest extends NetworkTestSupport {
+
+    private DemandForwardingBridge bridge;
+
+    private StubConnection producerConnection;
+
+    private ProducerInfo producerInfo;
+
+    private StubConnection consumerConnection;
+
+    private SessionInfo consumerSessionInfo;
+
+
+    public void testOverflow() throws Exception {
+        NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
+
+        configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>",
+                ActiveMQDestination.TOPIC_TYPE)));
+        configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
+                "TEST", ActiveMQDestination.QUEUE_TYPE)));
+
+        configureAndStartBridge(configuration);
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    private void assertReceiveMessageOn(String destinationName, byte destinationType) throws
Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName,
destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+
+        assertNotNull(m);
+    }
+
+    private void assertReceiveNoMessageOn(String destinationName, byte destinationType) throws
Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName,
destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+        assertNull(m);
+    }
+
+    private Message createConsumerAndReceiveMessage(ActiveMQDestination destination) throws
Exception {
+        // Now create remote consumer that should cause message to move to this
+        // remote consumer.
+        ConsumerInfo consumerInfo = createConsumerInfo(consumerSessionInfo, destination);
+        consumerConnection.send(consumerInfo);
+
+        Message m = receiveMessage(consumerConnection);
+        return m;
+    }
+
+    private void configureAndStartBridge(NetworkBridgeConfiguration configuration) throws
Exception {
+        bridge = new DemandForwardingBridge(configuration, createTransport(), createRemoteTransport());
+        bridge.setBrokerService(broker);
+        bridge.setDynamicallyIncludedDestinations(configuration.getDynamicallyIncludedDestinations().toArray(
+                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]
+        ));
+        bridge.setExcludedDestinations(configuration.getExcludedDestinations().toArray(
+                new ActiveMQDestination[configuration.getExcludedDestinations().size()]
+        ));
+        bridge.setStaticallyIncludedDestinations(configuration.getStaticallyIncludedDestinations().toArray(
+                new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]
+        ));
+        bridge.start();
+    }
+
+    public NetworkBridgeConfiguration getDefaultBridgeConfiguration() {
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setBrokerName("local");
+        config.setDispatchAsync(false);
+        return config;
+    }
+
+    // create sockets with max waiting value accepted
+    @Override
+    protected String getLocalURI() {
+        int port = findFreePort();
+        return String.format("tcp://localhost:%d?connectionTimeout=2147483647", port);
+    }
+
+    @Override
+    protected String getRemoteURI() {
+        int port = findFreePort();
+        return String.format("tcp://localhost:%d?connectionTimeout=2147483647",port);
+    }
+
+    private static int findFreePort() {
+        ServerSocket socket = null;
+        try {
+            socket = new ServerSocket(0);
+            socket.setReuseAddress(true);
+            int port = socket.getLocalPort();
+            try {
+                socket.close();
+            } catch (IOException e) {
+                // Ignore IOException on close()
+            }
+            return port;
+        } catch (IOException e) {
+        } finally {
+            if (socket != null) {
+                try {
+                    socket.close();
+                } catch (IOException e) {
+                }
+            }
+        }
+        throw new IllegalStateException("Could not find a free TCP/IP port to start embedded
Jetty HTTP Server on");
+    }
+
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        producerConnection = createConnection();
+        ConnectionInfo producerConnectionInfo = createConnectionInfo();
+        SessionInfo producerSessionInfo = createSessionInfo(producerConnectionInfo);
+        producerInfo = createProducerInfo(producerSessionInfo);
+        producerConnection.send(producerConnectionInfo);
+        producerConnection.send(producerSessionInfo);
+        producerConnection.send(producerInfo);
+
+        consumerConnection = createRemoteConnection();
+        ConnectionInfo consumerConnectionInfo = createConnectionInfo();
+        consumerSessionInfo = createSessionInfo(consumerConnectionInfo);
+        consumerConnection.send(consumerConnectionInfo);
+        consumerConnection.send(consumerSessionInfo);
+    }
+
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public static Test suite() {
+        return suite(DemandForwardingBridgeSupportTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}


Mime
View raw message