tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r385872 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/tcp/ src/share/org/apache/catalina/tribes/tcp/bio/ src/share/org/apache/catalina/tribes/tcp/nio/ test/org/apache/catalina/tribes/demos/
Date Tue, 14 Mar 2006 20:15:23 GMT
Author: fhanik
Date: Tue Mar 14 12:15:19 2006
New Revision: 385872

URL: http://svn.apache.org/viewcvs?rev=385872&view=rev
Log:
Worked on the pool, best to keep using the thread until it is done in regular BIO style.


Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
Tue Mar 14 12:15:19 2006
@@ -101,9 +101,13 @@
         queue.setLimit(poolSize);
     }
 
-    public void setSuspect(Boolean suspect) {
+    public void setSuspect(boolean suspect) {
         this.suspect = suspect;
     }
+    
+    public boolean getSuspect() {
+        return suspect;
+    }
 
     public boolean isConnected() {
         return connected;
@@ -244,15 +248,4 @@
             notify();
         }
     }
-    
-    public static void printArr(Object[] arr) {
-        System.out.print("[");
-        for (int i=0; i<arr.length; i++ ) {
-            System.out.print(arr[i]);
-            if ( (i+1)<arr.length )System.out.print(", ");
-        }
-        System.out.println("]");
-    }
-
-    
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Tue Mar 14 12:15:19 2006
@@ -180,7 +180,7 @@
                 InetSocketAddress addr = new InetSocketAddress(getBind(), portstart);
                 socket.bind(addr);
                 setTcpListenPort(portstart);
-                log.info("Nio Server Socket bound to:"+addr);
+                log.info("Receiver Server Socket bound to:"+addr);
                 return 0;
             }catch ( IOException x) {
                 retries--;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
Tue Mar 14 12:15:19 2006
@@ -16,18 +16,14 @@
 package org.apache.catalina.tribes.tcp.bio;
 
 import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
 
 import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.MessageListener;
-import java.net.InetAddress;
-import org.apache.catalina.tribes.tcp.nio.ThreadPool;
-import java.net.ServerSocket;
-import java.net.InetSocketAddress;
 import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.ChannelMessage;
-import java.net.Socket;
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.tcp.ReceiverBase;
+import org.apache.catalina.tribes.tcp.nio.ThreadPool;
 
 /**
  * <p>Title: </p>
@@ -113,7 +109,6 @@
         } catch (Exception x) {
             log.error("Unable to run replication listener.", x);
         }
-
     }
     
     public void listen() throws Exception {
@@ -125,6 +120,12 @@
 
         while ( doListen ) {
             Socket socket = null;
+            if ( pool.available() < 1 ) {
+                if ( log.isWarnEnabled() )
+                    log.warn("All BIO server replication threads are busy, unable to handle
more requests until a thread is freed up.");
+            }
+            TcpReplicationThread thread = (TcpReplicationThread)pool.getWorker();
+            if ( thread == null ) continue; //should never happen
             try {
                 socket = serverSocket.accept();
             }catch ( Exception x ) {
@@ -134,18 +135,8 @@
             if ( socket == null ) continue;
             socket.setReceiveBufferSize(rxBufSize);
             socket.setSendBufferSize(txBufSize);
-            TcpReplicationThread thread = (TcpReplicationThread)pool.getWorker();
             ObjectReader reader = new ObjectReader(socket,this);
-
-            if ( thread == null ) {
-                //we are out of workers, process the request on the listening thread
-                thread = getReplicationThread();
-                thread.socket = socket;
-                thread.reader = reader;
-                thread.run();
-            } else { 
-                thread.serviceSocket(socket,reader);
-            }//end if
+            thread.serviceSocket(socket,reader);
         }//while
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
Tue Mar 14 12:15:19 2006
@@ -348,7 +348,35 @@
      *      ChannelMessage)
      */
     public  void sendMessage(byte[] data) throws IOException {
-        pushMessage(data);
+        boolean messageTransfered = false ;
+        IOException exception = null;
+        try {
+             // first try with existing connection
+             pushMessage(data,false);
+             messageTransfered = true ;
+        } catch (IOException x) {
+            exception = x;
+            //resend
+            if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new
Integer(port)),x);
+            try {
+                // second try with fresh connection
+                pushMessage(data,true);                    
+                messageTransfered = true;
+                exception = null;
+            } catch (IOException xx) {
+                exception = xx;
+                closeSocket();
+            }
+        } finally {
+            this.requestCount++;
+            keepalive();
+            if(messageTransfered) {
+
+            } else {
+                if ( exception != null ) throw exception;
+            }
+        }
+
     }
 
     
@@ -444,37 +472,6 @@
         writeData(data);
     }
     
-    protected  void pushMessage( byte[] data) throws IOException {
-        boolean messageTransfered = false ;
-        IOException exception = null;
-        try {
-             // first try with existing connection
-             pushMessage(data,false);
-             messageTransfered = true ;
-        } catch (IOException x) {
-            exception = x;
-            //resend
-            if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),new
Integer(port)),x);
-            try {
-                // second try with fresh connection
-                pushMessage(data,true);                    
-                messageTransfered = true;
-                exception = null;
-            } catch (IOException xx) {
-                exception = xx;
-                closeSocket();
-            }
-        } finally {
-            this.requestCount++;
-            keepalive();
-            if(messageTransfered) {
-                
-            } else {
-                if ( exception != null ) throw exception;
-            }
-        }
-    }
-
     /**
      * Sent real cluster Message to socket stream
      * FIXME send compress

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
Tue Mar 14 12:15:19 2006
@@ -22,13 +22,9 @@
 public class PooledMultiSender extends PooledSender {
     
     protected long timeout = 15000;
-    protected boolean waitForAck = false;
     protected int retryAttempts=0;
     protected int keepAliveCount = Integer.MAX_VALUE;
     protected boolean directBuf = false;
-    protected int rxBufSize = 43800;
-    protected int txBufSize = 25188;
-    protected boolean suspect = false;
     private boolean autoConnect;
     private boolean useDirectBuffer;
 
@@ -63,10 +59,10 @@
         MultipointBioSender sender = new MultipointBioSender();
         sender.setAutoConnect(autoConnect);
         sender.setTimeout(timeout);
-        sender.setWaitForAck(waitForAck);
+        sender.setWaitForAck(getWaitForAck());
         sender.setMaxRetryAttempts(retryAttempts);
-        sender.setRxBufSize(rxBufSize);
-        sender.setTxBufSize(txBufSize);
+        sender.setRxBufSize(getRxBufSize());
+        sender.setTxBufSize(getTxBufSize());
         return sender;
     }
 
@@ -86,16 +82,9 @@
         this.retryAttempts = retryAttempts;
     }
 
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
-    }
 
     public void setUseDirectBuffer(boolean useDirectBuffer) {
         this.useDirectBuffer = useDirectBuffer;
-    }
-
-    public boolean getSuspect() {
-        return suspect;
     }
 
     public boolean isUseDirectBuffer() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/TcpReplicationThread.java
Tue Mar 14 12:15:19 2006
@@ -23,6 +23,7 @@
 import java.net.Socket;
 import java.io.InputStream;
 import org.apache.catalina.tribes.tcp.ReceiverBase;
+import java.io.OutputStream;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -72,8 +73,8 @@
                 } catch ( Exception x ) {
                     log.error("Unable to service bio socket");
                 }finally {
-                    try {reader.close();}catch ( Exception x){}
-                    try {socket.close();}catch ( Exception x){}
+                    try {socket.close();}catch ( Exception ignore){}
+                    try {reader.close();}catch ( Exception ignore){}
                     reader = null;
                     socket = null;
                 }
@@ -137,8 +138,7 @@
         while ( length >= 0 ) {
             int count = reader.append(buf,0,length,true);
             if ( count > 0 ) execute(reader);
-            if ( in.available() == 0 && reader.bufferSize() == 0 ) length = -1;
-            else length = in.read(buf);
+            length = in.read(buf);
         }
     }
 
@@ -163,12 +163,23 @@
      */
     protected void sendAck() {
         try {
-            this.socket.getOutputStream().write(Constants.ACK_COMMAND);
+            OutputStream out = socket.getOutputStream();
+            out.write(Constants.ACK_COMMAND);
+            out.flush();
             if (log.isTraceEnabled()) {
                 log.trace("ACK sent to " + socket.getPort());
             }
         } catch ( java.io.IOException x ) {
             log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
         }
+    }
+    
+    public void close() {
+        doRun = false;
+        try {socket.close();}catch ( Exception ignore){}
+        try {reader.close();}catch ( Exception ignore){}
+        reader = null;
+        socket = null;
+        super.close();
     }
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Tue Mar 14 12:15:19 2006
@@ -75,6 +75,7 @@
             long delta = System.currentTimeMillis() - start;
             while ( (remaining>0) && (delta<timeout) ) {
                 remaining -= doLoop(selectTimeout,retryAttempts);
+                delta = System.currentTimeMillis() - start;
             }
             if ( remaining > 0 ) {
                 //timeout has occured

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ThreadPool.java
Tue Mar 14 12:15:19 2006
@@ -94,6 +94,10 @@
 
         return (worker);
     }
+    
+    public int available() {
+        return idle.size();
+    }
 
     /**
      * Called by the worker thread to return itself to the

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385872&r1=385871&r2=385872&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
Tue Mar 14 12:15:19 2006
@@ -26,11 +26,9 @@
 import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 import org.apache.catalina.tribes.mcast.McastService;
 import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.tcp.ReceiverBase;
 import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
-import org.apache.catalina.tribes.tcp.nio.NioReceiver;
-import org.apache.catalina.tribes.tcp.bio.BioReceiver;
 import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.catalina.tribes.tcp.ReceiverBase;
 
 /**
  * <p>Title: </p>
@@ -56,7 +54,7 @@
            .append("\n\t\t[-autoconnect true|false]")
            .append("\n\t\t[-sync true|false]")
            .append("\n\t\t[-receiver org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]")
-           .append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultipointSender]")
+           .append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultiSender]")
            .append("\n\t\t[-transport.xxx transport specific property]")
            .append("\n\t\t[-maddr multicastaddr]")
            .append("\n\t\t[-mport multicastport]")



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message