activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r638910 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/ main/java/org/apache/activemq/openwire/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/a...
Date Wed, 19 Mar 2008 16:07:57 GMT
Author: rajdavies
Date: Wed Mar 19 09:07:54 2008
New Revision: 638910

URL: http://svn.apache.org/viewvc?rev=638910&view=rev
Log:
Added separately configurable initial delay for timeout tasks on InactivityMonitor

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
Wed Mar 19 09:07:54 2008
@@ -259,9 +259,20 @@
         return l == null ? 0 : l.longValue();
     }
 
-    public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+    public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
         setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
     }
+    
+    public long getMaxInactivityDurationInitalDelay() throws IOException {
+        Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay)
throws IOException {
+        setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
+    }
+    
+   
 
     /**
      * @throws IOException

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
Wed Mar 19 09:07:54 2008
@@ -36,7 +36,8 @@
     private boolean cacheEnabled = true;
     private boolean tightEncodingEnabled = true;
     private boolean sizePrefixDisabled;
-    private long maxInactivityDuration = 30 * 1000;
+    private long maxInactivityDuration = 30*1000;
+    private long maxInactivityDurationInitalDelay = 10*1000;
     private int cacheSize = 1024;
 
     public WireFormat createWireFormat() {
@@ -49,7 +50,8 @@
             info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
             info.setTightEncodingEnabled(tightEncodingEnabled);
             info.setSizePrefixDisabled(sizePrefixDisabled);
-            info.seMaxInactivityDuration(maxInactivityDuration);
+            info.setMaxInactivityDuration(maxInactivityDuration);
+            info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
             info.setCacheSize(cacheSize);
         } catch (Exception e) {
             IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
@@ -124,5 +126,14 @@
 
     public void setCacheSize(int cacheSize) {
         this.cacheSize = cacheSize;
+    }
+
+    public long getMaxInactivityDurationInitalDelay() {
+        return maxInactivityDurationInitalDelay;
+    }
+
+    public void setMaxInactivityDurationInitalDelay(
+            long maxInactivityDurationInitalDelay) {
+        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Wed Mar 19 09:07:54 2008
@@ -23,6 +23,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.thread.SchedulerTimerTask;
@@ -51,6 +52,7 @@
     private final AtomicBoolean commandSent = new AtomicBoolean(false);
     private final AtomicBoolean inSend = new AtomicBoolean(false);
     private final AtomicBoolean failed = new AtomicBoolean(false);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
 
     private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
@@ -59,6 +61,7 @@
     
     private long readCheckTime;
     private long writeCheckTime;
+    private long initialDelayTime;
     
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
@@ -107,7 +110,7 @@
     }
 
     public void stop() throws Exception {
-        stopMonitorThreads();
+        closeDown();
         next.stop();
     }
 
@@ -125,12 +128,15 @@
             }
             ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
-                    try {
-                        KeepAliveInfo info = new KeepAliveInfo();
-                        info.setResponseRequired(true);
-                        oneway(info);
-                    } catch (IOException e) {
-                        onException(e);
+                    if (stopped.get() == false) {
+                        try {
+
+                            KeepAliveInfo info = new KeepAliveInfo();
+                            info.setResponseRequired(true);
+                            oneway(info);
+                        } catch (IOException e) {
+                            onException(e);
+                        }
                     }
                 };
             });
@@ -155,9 +161,10 @@
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No message received since last read check for " + toString()
+ "! Throwing InactivityIOException.");
                 }
+                closeDown();
                 ASYNC_TASKS.execute(new Runnable() {  
                     public void run() {
-                    	handleException(new InactivityIOException("Channel was inactive for
too long: "+next.getRemoteAddress()));
+                        onException(new InactivityIOException("Channel was inactive for too
long: "+next.getRemoteAddress()));
                     };
                     
                 });
@@ -218,15 +225,17 @@
         synchronized(inSend) {
             inSend.set(true);
             try {
+                
+                if( failed.get() ) {
+                    closeDown();
+                    throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
+                }
                 if (o.getClass() == WireFormatInfo.class) {
                     synchronized (this) {
                         localWireFormatInfo = (WireFormatInfo)o;
                         startMonitorThreads();
                     }
                 }
-                if( failed.get() ) {
-                    throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
-                }
                 next.oneway(o);
             } finally {
                 commandSent.set(true);
@@ -236,17 +245,18 @@
     }
 
     public void onException(IOException error) {
-    	if( !failed.getAndSet(true) ) {
-	        handleException(error);
-    	}
+        closeDown();
+        if (!failed.getAndSet(true)) {
+            transportListener.onException(error);
+        }
+    }
+    	
+	private void closeDown() {
+        stopped.set(true);
+        if (monitorStarted.get()) {
+            stopMonitorThreads();
+        }
     }
-
-	private void handleException(IOException error) {
-		if (monitorStarted.get()) {
-		    stopMonitorThreads();
-		}
-		transportListener.onException(error);
-	}
 
     private synchronized void startMonitorThreads() throws IOException {
         if (monitorStarted.get()) {
@@ -260,6 +270,7 @@
         }
 
         readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+        initialDelayTime =  Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
         if (readCheckTime > 0) {
             monitorStarted.set(true);
             writeCheckerTask = new SchedulerTimerTask(writeChecker);
@@ -271,8 +282,8 @@
             	    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck");
             	}
             	CHECKER_COUNTER++;
-                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, writeCheckTime,writeCheckTime);
-                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, readCheckTime,readCheckTime);
+                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
+                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
Wed Mar 19 09:07:54 2008
@@ -45,6 +45,7 @@
 	}
 	public void setTransport(Transport transport) {
 		this.transport = transport;
+		this.transport.setTransportListener(this);
 	}
 	public URI getUri() {
 		return uri;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Wed Mar 19 09:07:54 2008
@@ -107,9 +107,12 @@
             public boolean iterate() {
             	boolean result=false;
             	boolean buildBackup=true;
-            	if (connectedTransport.get()==null && !disposed) {
-            		result=doReconnect();
-            		buildBackup=false;
+            	boolean doReconnect = !disposed;
+            	synchronized(backupMutex) {
+                	if (connectedTransport.get()==null && !disposed) {
+                		result=doReconnect();
+                		buildBackup=false;
+                	}
             	}
             	if(buildBackup) {
             		buildBackups();
@@ -253,6 +256,10 @@
             started = false;
             disposed = true;
             connected = false;
+            for (BackupTransport t:backups) {
+                t.setDisposed(true);
+            }
+            backups.clear();
 
             if (connectedTransport.get() != null) {
                 transportToStop = connectedTransport.getAndSet(null);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Wed Mar 19 09:07:54 2008
@@ -186,7 +186,7 @@
         });
         clientTransport.start();
         WireFormatInfo info = new WireFormatInfo();
-        info.seMaxInactivityDuration(1000);
+        info.setMaxInactivityDuration(1000);
         clientTransport.oneway(info);
 
         assertEquals(0, serverErrorCount.get());



Mime
View raw message