activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6262
Date Mon, 25 Apr 2016 21:42:15 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 2796c0f5d -> 0c3d05f2e


https://issues.apache.org/jira/browse/AMQ-6262

Ensure that the connection check task is stopped once commands pass
through the inactivity monitor to prevent the transport from being
closed for no reason.
(cherry picked from commit e47edd7a282e1391f480c1278555f4a86e9a8ea9)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0c3d05f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0c3d05f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0c3d05f2

Branch: refs/heads/activemq-5.13.x
Commit: 0c3d05f2eac40902fc252cb6cff4997f00b1b053
Parents: 2796c0f
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Apr 25 17:05:09 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Apr 25 17:05:45 2016 -0400

----------------------------------------------------------------------
 .../transport/http/HttpInactivityMonitor.java   | 58 ++++++++++++
 .../transport/http/HttpTransportFactory.java    |  9 +-
 .../http/HttpTransportConnectTimeoutTest.java   | 93 ++++++++++++++++++++
 .../src/test/resources/log4j.properties         |  1 +
 4 files changed, 156 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0c3d05f2/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java
b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java
new file mode 100644
index 0000000..e6b61b0
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.http;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Inactivity Monitor specialization for use with HTTP based transports.
+ */
+public class HttpInactivityMonitor extends InactivityMonitor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpInactivityMonitor.class);
+
+    /**
+     * @param next
+     *      The next Transport in the filter chain.
+     */
+    public HttpInactivityMonitor(Transport next) {
+        super(next, null);
+    }
+
+    @Override
+    public void onCommand(Object command) {
+        if (command.getClass() == ConnectionInfo.class || command.getClass() == BrokerInfo.class)
{
+            synchronized (this) {
+                try {
+                    LOG.trace("Connection {} attempted on HTTP based transport: {}", command,
this);
+                    processInboundWireFormatInfo(null);
+                } catch (IOException e) {
+                    onException(e);
+                }
+            }
+        }
+
+        super.onCommand(command);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/0c3d05f2/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
index 13b19c0..8332a95 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportLoggerFactory;
@@ -60,7 +59,7 @@ public class HttpTransportFactory extends TransportFactory {
         if (wireFormat instanceof TextWireFormat) {
             return (TextWireFormat)wireFormat;
         }
-        LOG.trace("Not created with a TextWireFormat: " + wireFormat);
+        LOG.trace("Not created with a TextWireFormat: {}", wireFormat);
         return new XStreamWireFormat();
     }
 
@@ -94,8 +93,8 @@ public class HttpTransportFactory extends TransportFactory {
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
         transport = super.compositeConfigure(transport, format, options);
-        HttpClientTransport httpTransport = (HttpClientTransport)transport.narrow(HttpClientTransport.class);
-        if(httpTransport != null && httpTransport.isTrace() ) {
+        HttpClientTransport httpTransport = transport.narrow(HttpClientTransport.class);
+        if (httpTransport != null && httpTransport.isTrace()) {
             try {
                 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
             } catch (Throwable e) {
@@ -104,7 +103,7 @@ public class HttpTransportFactory extends TransportFactory {
         }
         boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor",
"true"));
         if (useInactivityMonitor) {
-            transport = new InactivityMonitor(transport, null /* ignore wire format as no
negotiation over http */);
+            transport = new HttpInactivityMonitor(transport);
             IntrospectionSupport.setProperties(transport, options);
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0c3d05f2/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java
b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java
new file mode 100644
index 0000000..a8f3d6d
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.http;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HttpTransportConnectTimeoutTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HttpTransportConnectTimeoutTest.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        TransportConnector connector = broker.addConnector(
+            "http://localhost:0?trace=true&transport.connectAttemptTimeout=2000");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+
+        String connectionUri = connector.getPublishableConnectString();
+        factory = new ActiveMQConnectionFactory(connectionUri + "?trace=true");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendReceiveAfterPause() throws Exception {
+        final CountDownLatch failed = new CountDownLatch(1);
+
+        Connection connection = factory.createConnection();
+        connection.start();
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                LOG.info("Connection failed due to: {}", exception.getMessage());
+                failed.countDown();
+            }
+        });
+
+        assertFalse(failed.await(3, TimeUnit.SECONDS));
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createTemporaryQueue();
+        MessageProducer producer = session.createProducer(queue);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        producer.send(session.createMessage());
+
+        assertNotNull(consumer.receive(5000));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/0c3d05f2/activemq-http/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties
index f28c2a8..aa64270 100755
--- a/activemq-http/src/test/resources/log4j.properties
+++ b/activemq-http/src/test/resources/log4j.properties
@@ -21,6 +21,7 @@
 log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.activemq.transport.ws=DEBUG
+log4j.logger.org.apache.activemq.transport.http=DEBUG
 
 #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG


Mime
View raw message