activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r905769 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Date Tue, 02 Feb 2010 20:10:40 GMT
Author: tabish
Date: Tue Feb  2 20:10:39 2010
New Revision: 905769

URL: http://svn.apache.org/viewvc?rev=905769&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2196

Add option to ignore the values in the remote WireFormatInfo in case the user wants control
over the Brokers inactivity timeouts.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=905769&r1=905768&r2=905769&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Tue Feb  2 20:10:39 2010
@@ -35,18 +35,18 @@
 /**
  * Used to make sure that commands are arriving periodically from the peer of
  * the transport.
- * 
+ *
  * @version $Revision$
  */
 public class InactivityMonitor extends TransportFilter {
 
     private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
     private static final ThreadPoolExecutor ASYNC_TASKS;
-    
+
     private static int CHECKER_COUNTER;
     private static Timer  READ_CHECK_TIMER;
     private static Timer  WRITE_CHECK_TIMER;
-    
+
     private WireFormatInfo localWireFormatInfo;
     private WireFormatInfo remoteWireFormatInfo;
     private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
@@ -58,16 +58,17 @@
     private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
     private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
-    
+
     private SchedulerTimerTask writeCheckerTask;
     private SchedulerTimerTask readCheckerTask;
-    
+
+    private boolean ignoreRemoteWireFormat = false;
     private long readCheckTime;
     private long writeCheckTime;
     private long initialDelayTime;
     private boolean keepAliveResponseRequired;
     private WireFormat wireFormat;
-    
+
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
         public void run() {
@@ -77,22 +78,22 @@
             if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
                 LOG.debug(""+elapsed+" ms elapsed since last read check.");
             }
-            
+
             // Perhaps the timer executed a read check late.. and then executes
             // the next read check on time which causes the time elapsed between
             // read checks to be small..
-            
-            // If less than 90% of the read check Time elapsed then abort this readcheck.

+
+            // If less than 90% of the read check Time elapsed then abort this readcheck.
             if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline
this expression.
                 LOG.debug("Aborting read check.. Not enough time elapsed since last read
check.");
                 return;
             }
-            
+
             lastRunTime = now;
             readCheck();
         }
     };
-    
+
     private boolean allowReadCheck(long elapsed) {
         return elapsed > (readCheckTime * 9 / 10);
     }
@@ -103,9 +104,9 @@
             long now = System.currentTimeMillis();
             if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
                 LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
-                
+
             }
-            lastRunTime = now; 
+            lastRunTime = now;
             writeCheck();
         }
     };
@@ -116,7 +117,7 @@
     }
 
     public void stop() throws Exception {
-    	stopMonitorThreads();
+        stopMonitorThreads();
         next.stop();
     }
 
@@ -168,11 +169,11 @@
             if (LOG.isDebugEnabled()) {
                 LOG.debug("No message received since last read check for " + toString() +
"! Throwing InactivityIOException.");
             }
-            ASYNC_TASKS.execute(new Runnable() {  
+            ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
                     onException(new InactivityIOException("Channel was inactive for too (>"
+ readCheckTime + ") long: "+next.getRemoteAddress()));
                 };
-                
+
             });
         } else {
             if (LOG.isTraceEnabled()) {
@@ -216,7 +217,7 @@
                 }
             }
         } finally {
-            
+
             inReceive.set(false);
         }
     }
@@ -224,12 +225,12 @@
     public void oneway(Object o) throws IOException {
         // Disable inactivity monitoring while processing a command.
         //synchronize this method - its not synchronized
-        //further down the transport stack and gets called by more 
+        //further down the transport stack and gets called by more
         //than one thread  by this class
         synchronized(inSend) {
             inSend.set(true);
             try {
-                
+
                 if( failed.get() ) {
                     throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress());
                 }
@@ -252,12 +253,16 @@
             stopMonitorThreads();
             transportListener.onException(error);
         }
-    }   
-    
+    }
+
     public void setKeepAliveResponseRequired(boolean val) {
         keepAliveResponseRequired = val;
     }
 
+    public void setIgnoreRemoteWireFormat(boolean val) {
+        ignoreRemoteWireFormat = val;
+    }
+
     private synchronized void startMonitorThreads() throws IOException {
         if (monitorStarted.get()) {
             return;
@@ -269,19 +274,25 @@
             return;
         }
 
-        readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
-        initialDelayTime =  Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
+        if (!ignoreRemoteWireFormat) {
+            readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+            initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
+        } else {
+            readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
+            initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
+        }
+
         if (readCheckTime > 0) {
             monitorStarted.set(true);
             writeCheckerTask = new SchedulerTimerTask(writeChecker);
             readCheckerTask = new  SchedulerTimerTask(readChecker);
             writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
             synchronized( InactivityMonitor.class ) {
-            	if( CHECKER_COUNTER == 0 ) {
-            	    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
-            	    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
-            	}
-            	CHECKER_COUNTER++;
+                if( CHECKER_COUNTER == 0 ) {
+                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
+                    WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
+                }
+                CHECKER_COUNTER++;
                 WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
                 READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
             }
@@ -296,20 +307,20 @@
             readCheckerTask.cancel();
             writeCheckerTask.cancel();
             synchronized( InactivityMonitor.class ) {
-	            WRITE_CHECK_TIMER.purge();
-	            READ_CHECK_TIMER.purge();
-	            CHECKER_COUNTER--;
-	            if(CHECKER_COUNTER==0) {
-	            	WRITE_CHECK_TIMER.cancel();
-	            	READ_CHECK_TIMER.cancel();
-	            	WRITE_CHECK_TIMER = null;
-	            	READ_CHECK_TIMER = null;
-	            }
+                WRITE_CHECK_TIMER.purge();
+                READ_CHECK_TIMER.purge();
+                CHECKER_COUNTER--;
+                if(CHECKER_COUNTER==0) {
+                    WRITE_CHECK_TIMER.cancel();
+                    READ_CHECK_TIMER.cancel();
+                    WRITE_CHECK_TIMER = null;
+                    READ_CHECK_TIMER = null;
+                }
             }
         }
     }
-    
-       
+
+
     static {
         ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {



Mime
View raw message