activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1205011 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/
Date Tue, 22 Nov 2011 14:20:05 GMT
Author: gtully
Date: Tue Nov 22 14:20:04 2011
New Revision: 1205011

URL: http://svn.apache.org/viewvc?rev=1205011&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3603 - STOMP 1.1 introduced the heartBeat header
implemented by the inactivity monitor, would be nice to have this option for stomp 1.0. Implement
support for ?transport.defaultHeartBeat=5000,0 with test

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=1205011&r1=1205010&r2=1205011&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Tue Nov 22 14:20:04 2011
@@ -71,12 +71,16 @@ public class InactivityMonitor extends A
         long readCheckTime = getReadCheckTime();
 
         if (readCheckTime > 0) {
-            setWriteCheckTime(readCheckTime>3 ? readCheckTime/3 : readCheckTime);
+            setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime));
         }
 
         super.startMonitorThreads();
     }
 
+    private long writeCheckValueFromReadCheck(long readCheckTime) {
+        return readCheckTime>3 ? readCheckTime/3 : readCheckTime;
+    }
+
     @Override
     protected boolean configuredOk() throws IOException {
         boolean configured = false;
@@ -89,7 +93,7 @@ public class InactivityMonitor extends A
                 }
 
                 long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(),
remoteWireFormatInfo.getMaxInactivityDuration());
-                long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
+                long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
 
                 setReadCheckTime(readCheckTime);
                 setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
@@ -101,7 +105,7 @@ public class InactivityMonitor extends A
                 }
 
                 long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
-                long writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
+                long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
 
                 setReadCheckTime(readCheckTime);
                 setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1205011&r1=1205010&r2=1205011&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Tue Nov 22 14:20:04 2011
@@ -82,10 +82,6 @@ public class ProtocolConverter {
     private static final String BROKER_VERSION;
     private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
 
-    private static final long DEFAULT_OUTBOUND_HEARTBEAT = 100;
-    private static final long DEFAULT_INBOUND_HEARTBEAT = 1000;
-    private static final long DEFAULT_INITIAL_HEARTBEAT_DELAY = 1000;
-
     static {
         InputStream in = null;
         String version = "5.6.0";
@@ -123,8 +119,9 @@ public class ProtocolConverter {
     private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
     private final BrokerContext brokerContext;
     private String version = "1.0";
-    private long hbReadInterval = DEFAULT_INBOUND_HEARTBEAT;
-    private long hbWriteInterval = DEFAULT_OUTBOUND_HEARTBEAT;
+    private long hbReadInterval;
+    private long hbWriteInterval;
+    private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
 
     public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext)
{
         this.stompTransport = stompTransport;
@@ -620,7 +617,7 @@ public class ProtocolConverter {
             accepts = Stomp.DEFAULT_VERSION;
         }
         if (heartBeat == null) {
-            heartBeat = Stomp.DEFAULT_HEART_BEAT;
+            heartBeat = defaultHeartBeat;
         }
 
         HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
@@ -793,28 +790,27 @@ public class ProtocolConverter {
         return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
     }
 
+    public String getDefaultHeartBeat() {
+        return defaultHeartBeat;
+    }
+
+    public void setDefaultHeartBeat(String defaultHeartBeat) {
+        this.defaultHeartBeat = defaultHeartBeat;
+    }
+
     protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException
{
 
         String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
 
         if (keepAliveOpts == null || keepAliveOpts.length != 2) {
-            throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
+            throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
         } else {
 
             try {
                 hbReadInterval = Long.parseLong(keepAliveOpts[0]);
                 hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
             } catch(NumberFormatException e) {
-                throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig,
true);
-            }
-
-            if (hbReadInterval > 0) {
-                hbReadInterval = Math.max(DEFAULT_INBOUND_HEARTBEAT, hbReadInterval);
-                hbReadInterval += Math.min(hbReadInterval, 5000);
-            }
-
-            if (hbWriteInterval > 0) {
-                hbWriteInterval = Math.max(DEFAULT_OUTBOUND_HEARTBEAT, hbWriteInterval);
+                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig,
true);
             }
 
             try {
@@ -822,7 +818,7 @@ public class ProtocolConverter {
                 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
 
                 monitor.setReadCheckTime(hbReadInterval);
-                monitor.setInitialDelayTime(DEFAULT_INITIAL_HEARTBEAT_DELAY);
+                monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
                 monitor.setWriteCheckTime(hbWriteInterval);
 
                 monitor.startMonitoring();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1205011&r1=1205010&r2=1205011&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
Tue Nov 22 14:20:04 2011
@@ -130,4 +130,13 @@ public class StompTransportFilter extend
     public StompWireFormat getWireFormat() {
         return this.wireFormat;
     }
+
+    public String getDefaultHeartBeat() {
+        return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null;
+    }
+
+    public void setDefaultHeartBeat(String defaultHeartBeat) {
+        protocolConverter.setDefaultHeartBeat(defaultHeartBeat);
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java?rev=1205011&r1=1205010&r2=1205011&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
Tue Nov 22 14:20:04 2011
@@ -136,4 +136,43 @@ public class ConnectTest {
         socket.close();
         assertTrue("no exceptions", exceptions.isEmpty());
     }
+
+    @Test
+    public void testInactivityMonitor() throws Exception {
+
+        brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false");
+        brokerService.start();
+
+        Thread t1 = new Thread() {
+            StompConnection connection = new StompConnection();
+
+            public void run() {
+                try {
+                    connection.open("localhost",  brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
+                    connection.connect("system", "manager");
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        t1.run();
+
+        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+                 @Override
+                 public boolean isSatisified() throws Exception {
+                     return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
+                 }
+             }));
+
+        // and it should be closed due to inactivity
+        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
+            }
+        }));
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
 }
\ No newline at end of file



Mime
View raw message