tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r382464 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp: NioSender.java ParallelNioSender.java
Date Thu, 02 Mar 2006 18:47:03 GMT
Author: fhanik
Date: Thu Mar  2 10:47:01 2006
New Revision: 382464

URL: http://svn.apache.org/viewcvs?rev=382464&view=rev
Log:
Initial skeleton of the ParallelNioSender has been completed. This little piece of code has
the ability to transmit to multiple remote destinations in parallel since it uses non blocking
IO and only one thread.

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java?rev=382464&r1=382463&r2=382464&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
Thu Mar  2 10:47:01 2006
@@ -19,17 +19,15 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Arrays;
 
-import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
-import java.net.Socket;
 
 /**
  * This class is NOT thread safe and should never be used with more than one thread at a
time
@@ -49,9 +47,6 @@
 
     protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioSender.class);
 
-    
-    protected long ackTimeout = 15000;
-    protected String domain = "";
     protected boolean suspect = false;
     protected boolean connected = false;
     protected boolean waitForAck = false;
@@ -72,6 +67,8 @@
     protected int curPos=0;
     protected XByteBuffer ackbuf = new XByteBuffer(128,true);
     protected int remaining = 0;
+    private boolean complete;
+    private int attempt;
 
     public NioSender(Member destination) {
         this.destination = destination;
@@ -204,13 +201,15 @@
     public void disconnect() {
         try {
             this.connected = false;
-            Socket socket = socketChannel.socket();
-            socket.shutdownOutput();
-            socket.shutdownInput();
-            socket.close();
-            socketChannel.close();
-            socket = null;
-            socketChannel = null;
+            if ( socketChannel != null ) {
+                Socket socket = socketChannel.socket();
+                socket.shutdownOutput();
+                socket.shutdownInput();
+                socket.close();
+                socketChannel.close();
+                socket = null;
+                socketChannel = null;
+            }
         } catch ( Exception x ) {
             log.error("Unable to disconnect.",x);
         } finally {
@@ -228,6 +227,8 @@
         curPos = 0;
         ackbuf.clear();
         remaining = 0;
+        complete = false;
+        attempt = 0;
     }
 
     private ByteBuffer getReadBuffer() {
@@ -252,6 +253,10 @@
            }
        } 
    }
+   
+   public byte[] getMessage() {
+       return current;
+   }
 
 
     /**
@@ -263,19 +268,6 @@
     public boolean checkKeepAlive() {
         return false;
     }
-
-    /**
-     * getAckTimeout
-     *
-     * @return long
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public long getAckTimeout() {
-        return this.ackTimeout;
-    }
-
-    
-
     /**
      * getSuspect
      *
@@ -313,14 +305,17 @@
     public boolean getDirect() {
         return direct;
     }
-    /**
-     * setAckTimeout
-     *
-     * @param timeout long
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public void setAckTimeout(long timeout) {
-        this.ackTimeout = timeout;
+
+    public Member getDestination() {
+        return destination;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public int getAttempt() {
+        return attempt;
     }
 
     /**
@@ -369,5 +364,13 @@
 
     public void setDirect(boolean directBuffer) {
         this.direct = directBuffer;
+    }
+
+    public void setComplete(boolean complete) {
+        this.complete = complete;
+    }
+
+    public void setAttempt(int attempt) {
+        this.attempt = attempt;
     }
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java?rev=382464&r1=382463&r2=382464&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
Thu Mar  2 10:47:01 2006
@@ -16,11 +16,17 @@
 package org.apache.catalina.tribes.tcp;
 
 
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.io.ClusterData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.Iterator;
+import java.nio.channels.SelectionKey;
 
 /**
  * <p>Title: </p>
@@ -35,22 +41,168 @@
  * @version 1.0
  */
 public class ParallelNioSender {
-    protected long timeout;
-    protected boolean waitForAck;
-    
-    public ParallelNioSender(long timeout, boolean waitForAck) {
+    protected long timeout = 15000;
+    protected long selectTimeout = 50; 
+    protected boolean waitForAck = false;
+    protected int retryAttempts=0;
+    protected int keepAliveCount = Integer.MAX_VALUE;
+    protected Selector selector;
+    protected HashMap nioSenders = new HashMap();
+    protected boolean directBuf = false;
+    protected int rxBufSize = 43800;
+    protected int txBufSize = 25188;
+    public ParallelNioSender(long timeout, 
+                             boolean waitForAck,
+                             int retryAttempts,
+                             boolean directBuf,
+                             int rxBufSize,
+                             int txBufSize) throws IOException {
         this.timeout = timeout;
         this.waitForAck = waitForAck;
+        this.retryAttempts = retryAttempts;
+        selector = Selector.open();
+        this.directBuf = directBuf;
+        this.rxBufSize = rxBufSize;
+        this.txBufSize = txBufSize;
     }
     
     
-    public synchronized void sendMessage(Member mbr, ChannelMessage msg) throws ChannelException
{
+    public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws
ChannelException {
         long start = System.currentTimeMillis();
         byte[] data = XByteBuffer.createDataPackage((ClusterData)msg);
+        NioSender[] senders = setupForSend(destination);
+        connect(senders);
+        setData(senders,data);
+        int remaining = senders.length;
+        try {
+            //loop until complete, an error happens, or we timeout
+            long delta = System.currentTimeMillis() - start;
+            while ( (remaining>0) && (delta<timeout) ) {
+                remaining -= doLoop(selectTimeout,retryAttempts);
+            }
+            if ( remaining > 0 ) {
+                //timeout has occured
+                ChannelException cx = new ChannelException("Operation has timed out("+timeout+"
ms.).");
+                for (int i=0; i<senders.length; i++ ) {
+                    if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination());
+                }
+                throw cx;
+            }
+        } catch (Exception x ) {
+            try { this.close(); } catch (Exception ignore) {}
+            if ( x instanceof ChannelException ) throw (ChannelException)x;
+            else throw new ChannelException(x);
+        }
         
-        
     }
     
-    protected synchronized
+    private int doLoop(long selectTimeOut, int maxAttempts) throws IOException, ChannelException
{
+        int completed = 0;
+        int selectedKeys = selector.select(selectTimeOut);
+        if (selectedKeys == 0) {
+            return 0;
+        }
+        Iterator it = selector.selectedKeys().iterator();
+        while (it.hasNext()) {
+            SelectionKey sk = (SelectionKey) it.next();
+            it.remove();
+            int readyOps = sk.readyOps();
+            sk.interestOps(sk.interestOps() & ~readyOps);
+            NioSender sender = (NioSender) sk.attachment();
+            try {
+                if (sender.process(sk)) {
+                    sender.reset();
+                    completed++;
+                    sender.setComplete(true);
+                }//end if
+            } catch (Exception x) {
+                byte[] data = sender.getMessage();
+                int attempt = sender.getAttempt()+1;
+                if ( sender.getAttempt() >= maxAttempts && maxAttempts>0 )
{
+                    try { 
+                        sender.disconnect(); 
+                        sender.connect();
+                        sender.setAttempt(attempt);
+                        sender.setMessage(data);
+                    }catch ( Exception ignore){
+                        //dont report the error on a resend
+                    }
+                } else {
+                    ChannelException cx = new ChannelException(x);
+                    cx.addFaultyMember(sender.getDestination());
+                    throw cx;
+                }//end if
+            }
+        }
+        return completed;
+
+    }
+    
+    private void connect(NioSender[] senders) throws ChannelException {
+        ChannelException x = null;
+        for (int i=0; i<senders.length; i++ ) {
+            try {
+                if (!senders[i].isConnected()) senders[i].connect();
+            }catch ( IOException io ) {
+                if ( x==null ) x = new ChannelException(io);
+                x.addFaultyMember(senders[i].getDestination());
+            }
+        }
+        if ( x != null ) throw x;
+    }
+    
+    private void setData(NioSender[] senders, byte[] data) throws ChannelException {
+        ChannelException x = null;
+        for (int i=0; i<senders.length; i++ ) {
+            try {
+                senders[i].setMessage(data);
+            }catch ( IOException io ) {
+                if ( x==null ) x = new ChannelException(io);
+                x.addFaultyMember(senders[i].getDestination());
+            }
+        }
+        if ( x != null ) throw x;
+    }
+    
+    
+    private NioSender[] setupForSend(Member[] destination) {
+        NioSender[] result = new NioSender[destination.length];
+        for ( int i=0; i<destination.length; i++ ) {
+            NioSender sender = (NioSender)nioSenders.get(destination[i]);
+            if ( sender == null ) {
+                sender = new NioSender(destination[i]);
+                nioSenders.put(destination[i],sender);
+            }
+            sender.reset();
+            sender.setSelector(selector);
+            sender.setDirect(directBuf);
+            sender.setRxBufSize(rxBufSize);
+            sender.setTxBufSize(txBufSize);
+            sender.setWaitForAck(waitForAck);
+            result[i] = sender;
+        }
+        return result;
+    }
+    
+    public synchronized void close() throws ChannelException  {
+        ChannelException x = null;
+        Object[] members = nioSenders.keySet().toArray();
+        for (int i=0; i<members.length; i++ ) {
+            Member mbr = (Member)members[i];
+            try {
+                NioSender sender = (NioSender)nioSenders.get(mbr);
+                sender.disconnect();
+            }catch ( Exception e ) {
+                if ( x == null ) x = new ChannelException(e);
+                x.addFaultyMember(mbr);
+            }
+            nioSenders.remove(mbr);
+        }
+        if ( x != null ) throw x;
+    }
+    
+    public void finalize() {
+        try {close(); }catch ( Exception ignore){}
+    }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message