activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r944163 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/usecases/
Date Fri, 14 May 2010 09:08:07 GMT
Author: gtully
Date: Fri May 14 09:08:07 2010
New Revision: 944163

URL: http://svn.apache.org/viewvc?rev=944163&view=rev
Log:
make keepAlive optional for inactivity monitor, useKeepAlive=false option, such that it can
be used to abort slow consumers

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=944163&r1=944162&r2=944163&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Fri May 14 09:08:07 2010
@@ -66,6 +66,7 @@ public class InactivityMonitor extends T
     private long readCheckTime;
     private long writeCheckTime;
     private long initialDelayTime;
+    private boolean useKeepAlive = true;
     private boolean keepAliveResponseRequired;
     private WireFormat wireFormat;
 
@@ -129,7 +130,7 @@ public class InactivityMonitor extends T
             return;
         }
 
-        if (!commandSent.get()) {
+        if (!commandSent.get() && useKeepAlive) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
             }
@@ -258,6 +259,10 @@ public class InactivityMonitor extends T
     public void setKeepAliveResponseRequired(boolean val) {
         keepAliveResponseRequired = val;
     }
+    
+    public void setUseKeepAlive(boolean val) {
+        useKeepAlive = val;
+    }
 
     public void setIgnoreRemoteWireFormat(boolean val) {
         ignoreRemoteWireFormat = val;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java?rev=944163&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
Fri May 14 09:08:07 2010
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.URISupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DurableConsumerCloseAndReconnectTcpTest extends DurableConsumerCloseAndReconnectTest
+implements ExceptionListener, TransportListener {
+    private static final Log LOG = LogFactory.getLog(DurableConsumerCloseAndReconnectTcpTest.class);
+    
+    private BrokerService broker;
+    private TransportConnector connector;
+
+    private CountDownLatch gotException = new CountDownLatch(1);
+
+    private Exception reconnectException;
+
+    private boolean reconnectInExceptionListener;
+
+    private boolean reconnectInTransportListener;
+    
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        // let the client initiate the inactivity timeout
+        connector = broker.addConnector("tcp://localhost:0?transport.useInactivityMonitor=false");
+        broker.setPersistent(false);
+        broker.start();
+        
+        class SlowCloseSocketTcpTransportFactory extends TcpTransportFactory {
+
+            class SlowCloseSocketFactory extends SocketFactory {
+                
+                class SlowCloseSocket extends Socket {
+                    public SlowCloseSocket(String host, int port) throws IOException {
+                        super(host, port);
+                    }
+
+                    public SlowCloseSocket(InetAddress host, int port) throws IOException
{
+                        super(host, port);
+                    }
+
+                    public SlowCloseSocket(String host, int port, InetAddress localHost,
int localPort) throws IOException {
+                        super(host, port, localHost, localPort);
+                    }
+
+                    public SlowCloseSocket(InetAddress address, int port, InetAddress localAddress,
int localPort) throws IOException {
+                        super(address, port, localAddress, localPort);
+                    }
+
+                    @Override
+                    public synchronized void close() throws IOException {
+                        LOG.info("delaying close");
+                        try {
+                            TimeUnit.MILLISECONDS.sleep(500);
+                        } catch (InterruptedException e) {
+                            // TODO Auto-generated catch block
+                            e.printStackTrace();
+                        }
+                        super.close();
+                    }
+                    
+                    
+                }
+                @Override
+                public Socket createSocket(String host, int port) throws IOException, UnknownHostException
{
+                    return new SlowCloseSocket(host, port);
+                }
+
+                @Override
+                public Socket createSocket(InetAddress host, int port) throws IOException
{
+                    return new SlowCloseSocket(host, port);
+                }
+
+                @Override
+                public Socket createSocket(String host, int port, InetAddress localHost,
int localPort) throws IOException,
+                        UnknownHostException {
+                    return new SlowCloseSocket(host, port, localHost, localPort);
+                }
+
+                @Override
+                public Socket createSocket(InetAddress address, int port, InetAddress localAddress,
int localPort) throws IOException {
+                    return new SlowCloseSocket(address, port, localAddress, localPort);
+                }
+                
+            }
+            @Override
+            protected SocketFactory createSocketFactory() throws IOException {
+                return new SlowCloseSocketFactory();
+            }
+            
+        }
+        
+        TransportFactory.registerTransportFactory("tcp", new SlowCloseSocketTcpTransportFactory());
+        
+    }
+    
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(URISupport.removeQuery(connector.getConnectUri())
+ "?useKeepAlive=false&wireFormat.maxInactivityDuration=2000");
+    }
+
+    @Override
+    public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
+        reconnectInExceptionListener = true;
+        makeConsumer();
+        connection.setExceptionListener(this);
+        ((ActiveMQConnection)connection).addTransportListener(this);
+        assertTrue("inactive connection timedout", gotException.await(30, TimeUnit.SECONDS));
+        assertNotNull("Got expected exception on close reconnect overlap: " + reconnectException,
reconnectException);
+    }
+
+    
+    public void testCreateDurableConsumerSlowCloseThenReconnectTransportListener() throws
Exception {
+        reconnectInTransportListener = true;
+        makeConsumer();
+        connection.setExceptionListener(this);
+        ((ActiveMQConnection)connection).addTransportListener(this);
+        assertTrue("inactive connection timedout", gotException.await(30, TimeUnit.SECONDS));
+        assertNull("No exception: " + reconnectException, reconnectException);
+    }
+    
+    public void onException(JMSException exception) {
+        LOG.info("Exception listener exception:" + exception);
+        if (reconnectInExceptionListener) {
+            try {
+                makeConsumer();
+            } catch (Exception e) {
+                reconnectException = e;
+            }
+        
+            gotException.countDown();
+        }
+    }
+
+    public void onCommand(Object command) {}
+
+    public void onException(IOException error) {
+       LOG.info("Transport listener exception:" + error);
+       if (reconnectInTransportListener) {
+           try {
+               makeConsumer();
+           } catch (Exception e) {
+               reconnectException = e;
+           }
+       
+           gotException.countDown();
+       }
+    }
+
+    public void transportInterupted() {}
+
+    public void transportResumed() {}
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTcpTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java?rev=944163&r1=944162&r2=944163&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Fri May 14 09:08:07 2010
@@ -39,7 +39,7 @@ public class DurableConsumerCloseAndReco
     protected static final long RECEIVE_TIMEOUT = 5000L;
     private static final Log LOG = LogFactory.getLog(DurableConsumerCloseAndReconnectTest.class);
 
-    private Connection connection;
+    protected Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private MessageProducer producer;



Mime
View raw message