activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r382503 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/activeio/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/transport/tcp/
Date Thu, 02 Mar 2006 20:25:21 GMT
Author: chirino
Date: Thu Mar  2 12:25:19 2006
New Revision: 382503

URL: http://svn.apache.org/viewcvs?rev=382503&view=rev
Log:
- Implemented transpor inactivity monitoring for the tcp transport
- The properties set on the tcp transport can now be set on the transport sever, and the sever
will configure the transports it creates with those properties.

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=382503&r1=382502&r2=382503&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Thu Mar  2 12:25:19 2006
@@ -18,9 +18,11 @@
 
 import java.io.IOException;
 
+import org.apache.activemq.command.Command;
 import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.thread.Scheduler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
@@ -31,25 +33,26 @@
  */
 public class InactivityMonitor extends TransportFilter implements Runnable {
 
+    private final Log log = LogFactory.getLog(InactivityMonitor.class);
+    
     private final long maxInactivityDuration;
     private final AtomicBoolean cancled = new AtomicBoolean(false);
-    private byte runIteration=0;
+    private byte readCheckIteration=0;
+
+    private final AtomicBoolean commandSent=new AtomicBoolean(true);
+    private final AtomicBoolean inSend=new AtomicBoolean(false);
 
-    private long lastReadCount;
-    private long lastWriteCount;
-    private final CountStatisticImpl readCounter;
-    private final CountStatisticImpl writeCounter;
+    private final AtomicBoolean commandReceived=new AtomicBoolean(true);
+    private final AtomicBoolean inReceive=new AtomicBoolean(false);
     
-    public InactivityMonitor(Transport next, long maxInactivityDuration, CountStatisticImpl
readCounter, CountStatisticImpl writeCounter ) {
+    public InactivityMonitor(Transport next, long maxInactivityDuration) {
         super(next);
         this.maxInactivityDuration = maxInactivityDuration;
-        this.readCounter = readCounter;
-        this.writeCounter = writeCounter;
     }
-    
+        
     public void start() throws Exception {
         next.start();
-        Scheduler.executePeriodically(this, maxInactivityDuration/5);
+        Scheduler.executePeriodically(this, maxInactivityDuration/2);
     }
     
     public void stop() throws Exception {
@@ -60,33 +63,74 @@
     }
     
     public void run() {
-        
-        switch(runIteration) {
+        switch(readCheckIteration) {
+        case 0:
+            writeCheck();
+            readCheckIteration++;
         case 1:
-        case 2:
-            long wc = writeCounter.getCount();
-            if( wc==lastWriteCount ) {
-                try {
-                    oneway(new KeepAliveInfo());
-                } catch (IOException e) {
-                    onException(e);
-                }
-            } else {
-                lastWriteCount = wc;
-            }
+            readCheck();
+            writeCheck();
+            readCheckIteration=0;
             break;
-        case 4:
-            long rc = readCounter.getCount();
-            if( rc == lastReadCount ) {
-                onException(new InactivityIOException("Channel was inactive for too long."));
-            } else {
-                lastReadCount = rc;
+        }        
+    }
+    
+    private void writeCheck() {
+        if( inSend.get() ) {
+            log.debug("A send is in progress");
+            return;
+        }
+        
+        if( !commandSent.get() ) {
+            log.debug("No message sent since last write check, sending a KeepAliveInfo");
+            try {
+                next.oneway(new KeepAliveInfo());
+            } catch (IOException e) {
+                onException(e);
             }
+        } else {
+            log.debug("Message sent since last write check, resetting flag");
         }
         
-        runIteration++;
-        if(runIteration>=5)
-            runIteration=0;
+        commandSent.set(false);
+        
+    }
+
+    private void readCheck() {
+        if( inReceive.get() ) {
+            log.debug("A receive is in progress");
+            return;
+        }
+        
+        if( !commandReceived.get() ) {
+            log.debug("No message received since last read check!");
+            onException(new InactivityIOException("Channel was inactive for too long."));
          
+        } else {
+            log.debug("Message received since last read check, resetting flag");
+        }
+        
+        commandReceived.set(false);
+    }
+
+    public void onCommand(Command command) {
+        inReceive.set(true);
+        try {
+            commandListener.onCommand(command);
+        } finally {
+            inReceive.set(false);
+            commandReceived.set(true);
+        }
+    }
+    
+    public void oneway(Command command) throws IOException {
+        // Disable inactivity monitoring while processing a command.
+        inSend.set(true);
+        commandSent.set(true);
+        try {
+            next.oneway(command);
+        } finally {
+            inSend.set(false);
+        }
     }
     
     public void onException(IOException error) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java?rev=382503&r1=382502&r2=382503&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportLogger.java
Thu Mar  2 12:25:19 2006
@@ -32,7 +32,7 @@
     private final Log log;
     
     public TransportLogger(Transport next) {
-        this( next, LogFactory.getLog(TransportLogger.class.getName()+"."+getNextId()));
+        this( next, LogFactory.getLog(TransportLogger.class.getName()+":"+getNextId()));
     }
     
     synchronized private static int getNextId() {
@@ -49,6 +49,20 @@
             log.debug("SENDING: "+command);
         }
         next.oneway(command);
+    }
+    
+    public void onCommand(Command command) {
+        if( log.isDebugEnabled() ) {
+            log.debug("RECEIVED: "+command);
+        }
+        commandListener.onCommand(command);
+    }
+    
+    public void onException(IOException error) {
+        if( log.isDebugEnabled() ) {
+            log.debug("RECEIVED Exception: "+error, error);
+        }
+        commandListener.onException(error);
     }
     
     public String toString() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java?rev=382503&r1=382502&r2=382503&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
Thu Mar  2 12:25:19 2006
@@ -229,8 +229,8 @@
         if( activeIOTransport.isTrace() ) {
             transport = new TransportLogger(transport);
         }
-        transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration(),
activityMonitor.getReadCounter(), activityMonitor.getWriteCounter());
         transport = new WireFormatNegotiator(transport,format, activeIOTransport.getMinmumWireFormatVersion());
+        transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
         return transport;
@@ -275,8 +275,8 @@
         if( activeIOTransport.isTrace() ) {
             transport = new TransportLogger(transport);
         }
-        transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration(),
activityMonitor.getReadCounter(), activityMonitor.getWriteCounter());
         transport = new WireFormatNegotiator(transport,format, activeIOTransport.getMinmumWireFormatVersion());
+        transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
         return transport;        
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=382503&r1=382502&r2=382503&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Thu Mar  2 12:25:19 2006
@@ -55,7 +55,8 @@
     private boolean trace;
     private boolean useLocalHost = true;
     private int minmumWireFormatVersion;
-
+    private long maxInactivityDuration = 30000;
+    
     /**
      * Construct basic helpers
      * 
@@ -273,6 +274,14 @@
         if (dataIn != null) {
             dataIn.close();
         }
+    }
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=382503&r1=382502&r2=382503&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
Thu Mar  2 12:25:19 2006
@@ -26,6 +26,7 @@
 import javax.net.SocketFactory;
 import org.activeio.command.WireFormat;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
@@ -67,9 +68,12 @@
         // transport = new InactivityMonitor(transport,
         // temp.getMaxInactivityDuration(), activityMonitor.getReadCounter(),
         // activityMonitor.getWriteCounter());
+
         if( format instanceof OpenWireFormat )
             transport = new WireFormatNegotiator(transport, format, tcpTransport.getMinmumWireFormatVersion());
         
+        transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
+        
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
         return transport;
@@ -87,6 +91,7 @@
         // temp.getMaxInactivityDuration(), activityMonitor.getReadCounter(),
         // activityMonitor.getWriteCounter());
         transport = new WireFormatNegotiator(transport, format, tcpTransport.getMinmumWireFormatVersion());
+        transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
         return transport;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=382503&r1=382502&r2=382503&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Thu Mar  2 12:25:19 2006
@@ -48,6 +48,9 @@
     private int backlog = 5000;
     private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
     private TcpTransportFactory transportFactory = new TcpTransportFactory();
+    private long maxInactivityDuration = 30000;
+    private int minmumWireFormatVersion;
+    private boolean trace;
     
     /**
      * Constructor
@@ -101,6 +104,9 @@
                     }
                     else {
                         HashMap options = new HashMap();
+                        options.put("maxInactivityDuration", new Long(maxInactivityDuration));
+                        options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion));
+                        options.put("trace", new Boolean(trace));
                         WireFormat format = wireFormatFactory.createWireFormat();
                         TcpTransport transport = new TcpTransport(format, socket);
                         Transport configuredTransport = transportFactory.configure(transport,
format, options);
@@ -180,5 +186,29 @@
         if (serverSocket != null) {
             serverSocket.close();
         }
+    }
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
+    }
+
+    public int getMinmumWireFormatVersion() {
+        return minmumWireFormatVersion;
+    }
+
+    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+        this.minmumWireFormatVersion = minmumWireFormatVersion;
+    }
+
+    public boolean isTrace() {
+        return trace;
+    }
+
+    public void setTrace(boolean trace) {
+        this.trace = trace;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=382503&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
Thu Mar  2 12:25:19 2006
@@ -0,0 +1,165 @@
+package org.apache.activemq.transport.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+
+public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener
{
+    
+    private TransportServer server;
+    private Transport clientTransport;
+    private Transport serverTransport;
+    
+    private final AtomicInteger clientReceiveCount = new AtomicInteger(0);
+    private final AtomicInteger clientErrorCount = new AtomicInteger(0);
+    private final AtomicInteger serverReceiveCount = new AtomicInteger(0);
+    private final AtomicInteger serverErrorCount = new AtomicInteger(0);
+    
+    private final AtomicBoolean ignoreClientError = new AtomicBoolean(false);
+    private final AtomicBoolean ignoreServerError = new AtomicBoolean(false);
+    
+    public Runnable serverRunOnCommand;
+    public Runnable clientRunOnCommand;
+    
+    public long clientInactivityLimit;
+    public long serverInactivityLimit;
+
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+        server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?maxInactivityDuration="+serverInactivityLimit));
+        server.setAcceptListener(this);
+        server.start();
+        clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?maxInactivityDuration="+clientInactivityLimit));
+        clientTransport.setTransportListener(new TransportListener() {
+            public void onCommand(Command command) {
+                clientReceiveCount.incrementAndGet();
+                if( clientRunOnCommand !=null ) {
+                    clientRunOnCommand.run();
+                }
+            }
+            public void onException(IOException error) {
+                if( !ignoreClientError.get() ) {
+                    System.out.println("Client transport error:");
+                    error.printStackTrace();
+                    clientErrorCount.incrementAndGet();
+                }
+            }
+            public void transportInterupted() {
+            }
+            public void transportResumed() {
+            }});
+        clientTransport.start();
+    }
+    
+    protected void tearDown() throws Exception {
+        ignoreClientError.set(true);
+        ignoreServerError.set(true);
+        clientTransport.stop();
+        serverTransport.stop();
+        server.stop();
+        super.tearDown();
+    }
+    
+    public void onAccept(Transport transport) {
+        try {
+            serverTransport = transport;
+            serverTransport.setTransportListener(new TransportListener() {
+                public void onCommand(Command command) {
+                    serverReceiveCount.incrementAndGet();
+                    if( serverRunOnCommand !=null ) {
+                        serverRunOnCommand.run();
+                    }
+                }
+                public void onException(IOException error) {
+                    if( !ignoreClientError.get() ) {
+                        System.out.println("Server transport error:");
+                        error.printStackTrace();
+                        serverErrorCount.incrementAndGet();
+                    }
+                }
+                public void transportInterupted() {
+                }
+                public void transportResumed() {
+                }});
+            serverTransport.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void onAcceptError(Exception error) {
+        error.printStackTrace();
+    }
+
+    public void initCombosForTestClientHang() {
+        addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)});
+        addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
+    }
+    public void testClientHang() throws Exception {
+        
+        assertEquals(0, serverErrorCount.get());
+        assertEquals(0, clientErrorCount.get());
+        
+        // Server should consider the client timed out right away since the client is not
hart beating fast enough.
+        Thread.sleep(3000);
+        
+        assertEquals(0, clientErrorCount.get());
+        assertTrue(serverErrorCount.get()>0);
+    }
+    
+    public void initCombosForTestNoClientHang() {
+        addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
+        addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
+    }
+    public void testNoClientHang() throws Exception {
+        
+        assertEquals(0, serverErrorCount.get());
+        assertEquals(0, clientErrorCount.get());
+        
+        Thread.sleep(4000);
+        
+        assertEquals(0, clientErrorCount.get());
+        assertEquals(0, serverErrorCount.get());
+    }
+
+    /**
+     * Used to test when a operation blocks.  This should
+     * not cause transport to get disconnected.
+     */
+    public void initCombosForTestNoClientHangWithServerBlock() {
+        addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
+        addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
+        addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() {
+                public void run() {
+                    try {
+                        System.out.println("Sleeping");
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }});
+    }
+    public void testNoClientHangWithServerBlock() throws Exception {
+        
+        assertEquals(0, serverErrorCount.get());
+        assertEquals(0, clientErrorCount.get());
+        
+        Thread.sleep(4000);
+        
+        assertEquals(0, clientErrorCount.get());
+        assertEquals(0, serverErrorCount.get());
+    }
+
+}



Mime
View raw message