tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r386711 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/ src/share/org/apache/catalina/tribes/group/ src/share/org/apache/catalina/tribes/io/ src/share/org/apache/catalina/tribes/tcp/ src/share/org/a...
Date Fri, 17 Mar 2006 20:57:08 GMT
Author: fhanik
Date: Fri Mar 17 12:57:04 2006
New Revision: 386711

URL: http://svn.apache.org/viewcvs?rev=386711&view=rev
Log:
Major refactor, instead of hardcoding ack, and acksync, these are now options you can attach to each message
This way, one channel can server many many applications, all that have different needs

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Fri Mar 17 12:57:04 2006
@@ -36,6 +36,16 @@
     public static final int MBR_TX_SEQ = 8;
     
     /**
+     * 
+     * Send options
+     */
+    public static final int SEND_OPTIONS_USE_ACK = 0x0001;
+    public static final int SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0002;
+    public static final int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;
+    public static final int SEND_OPTIONS_BYTE_MESSAGE = 0x0004;
+
+    
+    /**
      * Adds an interceptor to the channel message chain.
      * @param interceptor ChannelInterceptor
      */
@@ -74,7 +84,7 @@
      * @param options int - sender options, see class documentation
      * @return ClusterMessage[] - the replies from the members, if any. 
      */
-    public void send(Member[] destination, Serializable msg) throws ChannelException;
+    public void send(Member[] destination, Serializable msg, int options) throws ChannelException;
 
     
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java Fri Mar 17 12:57:04 2006
@@ -24,6 +24,9 @@
  */
 public interface ChannelMessage extends Serializable {
     
+    
+    
+    
     /**
      * Get the address that this message originated from.  This would be set
      * if the message was being relayed from a host other than the one

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java Fri Mar 17 12:57:04 2006
@@ -35,18 +35,6 @@
     public void stop();
 
     /**
-     * returns true of the receiver is sending acks when it receives messages
-     * @return boolean
-     */
-    public boolean getSendAck();
-    
-    /**
-     * set ack mode
-     * @param isSendAck
-     */
-    public void setSendAck(boolean sendack);
-    
-    /**
      * get the listing ip interface
      * @return The host
      */

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Mar 17 12:57:04 2006
@@ -43,7 +43,7 @@
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
  */
 public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {
-    public static final int BYTE_MESSAGE = 0x0001;
+    
     
     private ChannelCoordinator coordinator = new ChannelCoordinator();
     private ChannelInterceptor interceptors = null;
@@ -90,19 +90,18 @@
      * @param options int - sender options, see class documentation
      * @return ClusterMessage[] - the replies from the members, if any.
      */
-    public void send(Member[] destination, Serializable msg) throws ChannelException {
+    public void send(Member[] destination, Serializable msg, int options) throws ChannelException {
         if ( msg == null ) return;
         try {
             if ( destination == null ) throw new ChannelException("No destination given");
             if ( destination.length == 0 ) return;
-            int options = 0;
             ClusterData data = new ClusterData();//generates a unique Id
             data.setAddress(getLocalMember(false));
             data.setTimestamp(System.currentTimeMillis());
             byte[] b = null;
             if ( msg instanceof ByteMessage ){
                 b = ((ByteMessage)msg).getMessage();
-                options = options | BYTE_MESSAGE;
+                options = options | SEND_OPTIONS_BYTE_MESSAGE;
             } else {
                 b = XByteBuffer.serialize(msg);
             }
@@ -122,7 +121,7 @@
         try {
             
             Serializable fwd = null;
-            if ( (msg.getOptions() & BYTE_MESSAGE) == BYTE_MESSAGE ) {
+            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
                 fwd = new ByteMessage(msg.getMessage().getBytes());
             } else {
                 fwd = XByteBuffer.deserialize(msg.getMessage().getBytes());

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Fri Mar 17 12:57:04 2006
@@ -15,17 +15,13 @@
  */
 package org.apache.catalina.tribes.io;
 
+import java.util.Arrays;
+
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
 import org.apache.catalina.tribes.mcast.MemberImpl;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import org.apache.catalina.tribes.util.UUIDGenerator;
-import java.util.Arrays;
+import org.apache.catalina.tribes.Channel;
 
 /**
  * The cluster data class is used to transport around the byte array from
@@ -226,6 +222,17 @@
         clone.uniqueId = this.uniqueId;
         clone.address = this.address;
         return clone;
+    }
+    
+    public static boolean sendAckSync(int options) {
+        return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&
+            ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+    }
+
+
+    public static boolean sendAckAsync(int options) {
+        return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) &&
+            ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
     }
     
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Fri Mar 17 12:57:04 2006
@@ -15,13 +15,13 @@
  */
 package org.apache.catalina.tribes.io;
 
+import java.io.IOException;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
+
 import org.apache.catalina.tribes.ChannelMessage;
-import java.io.IOException;
-import java.net.Socket;
-import org.apache.catalina.tribes.tcp.*;
 
 
 
@@ -41,10 +41,6 @@
 
     protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ObjectReader.class);
 
-    private SocketChannel channel;
-
-    private ListenCallback callback;
-
     private XByteBuffer buffer;
 
     /**
@@ -53,18 +49,10 @@
      * @param selector
      * @param callback
      */
-    public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback) {
-        this.channel = channel;
-        this.callback = callback;
-        try {
-            this.buffer = new XByteBuffer(channel.socket().getReceiveBufferSize(), true);
-        }catch ( IOException x ) {
-            //unable to get buffer size
-            log.warn("Unable to retrieve the socket channel receiver buffer size, setting to default 43800 bytes.");
-            this.buffer = new XByteBuffer(43800,true);
-        }
+    public ObjectReader(SocketChannel channel) {
+        this(channel.socket());
     }
-    public ObjectReader(Socket socket, ListenCallback callback) {
+    public ObjectReader(Socket socket) {
         try{
             this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true);
         }catch ( IOException x ) {
@@ -72,23 +60,6 @@
             log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");
             this.buffer = new XByteBuffer(43800,true);
         }
-        this.callback = callback;
-    }
-
-    /**
-     * get the current SimpleTcpCluster
-     * @return Returns the callback.
-     */
-    public ListenCallback getCallback() {
-        return callback;
-    }
-
-    /**
-     * Get underlying NIO channel
-     * @return The socket
-     */
-    public SocketChannel getChannel() {
-        return this.channel;
     }
 
     /**
@@ -125,16 +96,14 @@
      * @return number of received packages/messages
      * @throws java.io.IOException
      */
-    public int execute() throws java.io.IOException {
-        int pkgCnt = 0;
-        boolean pkgExists = buffer.doesPackageExist();
-        while ( pkgExists ) {
+    public ChannelMessage[] execute() throws java.io.IOException {
+        int pkgCnt = buffer.countPackages();
+        ChannelMessage[] result = new ChannelMessage[pkgCnt];
+        for (int i=0; i<pkgCnt; i++)  {
             ChannelMessage data = buffer.extractPackage(true);
-            getCallback().messageDataReceived(data);
-            pkgCnt++;
-            pkgExists = buffer.doesPackageExist();
+            result[i] = data;
         }
-        return pkgCnt;
+        return result;
     }
     
     public int bufferSize() {
@@ -149,19 +118,7 @@
         return buffer.countPackages();
     }
     
-    /**
-     * Write Ack to sender
-     * @param buf
-     * @return The bytes written count
-     * @throws java.io.IOException
-     */
-    public int write(ByteBuffer buf) throws java.io.IOException {
-        return getChannel().write(buf);
-    }
-    
     public void close() {
-        this.callback = null;
-        this.channel = null;
         this.buffer = null;
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractSender.java Fri Mar 17 12:57:04 2006
@@ -37,7 +37,6 @@
 public abstract class AbstractSender implements DataSender {
     
     private boolean connected = false;
-    private boolean waitForAck = false;
     private int rxBufSize = 25188;
     private int txBufSize = 43800;
     private boolean directBuffer = false;
@@ -138,10 +137,6 @@
         return txBufSize;
     }
 
-    public boolean getWaitForAck() {
-        return waitForAck;
-    }
-
     public InetAddress getAddress() {
         return address;
     }
@@ -188,10 +183,6 @@
 
     public void setTxBufSize(int txBufSize) {
         this.txBufSize = txBufSize;
-    }
-
-    public void setWaitForAck(boolean waitForAck) {
-        this.waitForAck = waitForAck;
     }
 
     public void setConnectTime(long connectTime) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Fri Mar 17 12:57:04 2006
@@ -37,7 +37,6 @@
     public void setTxBufSize(int size);
     public boolean keepalive();
     public void setTimeout(long timeout);
-    public void setWaitForAck(boolean isWaitForAck);
     public void setKeepAliveCount(int maxRequests);
     public void setKeepAliveTime(long keepAliveTimeInMs);
     public int getRequestCount();

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java Fri Mar 17 12:57:04 2006
@@ -40,8 +40,6 @@
  */
 public abstract class ReceiverBase implements ChannelReceiver, ListenCallback {
 
-    public static final int OPTION_SEND_ACK = 0x0001;
-    public static final int OPTION_SYNCHRONIZED = 0x0002;
     public static final int OPTION_DIRECT_BUFFER = 0x0004;
 
 
@@ -51,7 +49,6 @@
     private String host;
     private InetAddress bind;
     private int port;
-    private boolean sync;
     private int rxBufSize = 43800;
     private int txBufSize = 25188;
     private int tcpThreadCount;
@@ -60,7 +57,6 @@
     private boolean direct = true;
     private long tcpSelectorTimeout;
     private String tcpListenAddress;
-    private boolean sendAck;
     //how many times to search for an available socket
     private int autoBind = 1;
 
@@ -99,15 +95,6 @@
     }
 
     /**
-     *
-     * @return boolean
-     * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
-     */
-    public boolean getSendAck() {
-        return sendAck;
-    }
-
-    /**
      * setMessageListener
      *
      * @param listener MessageListener
@@ -125,10 +112,6 @@
         this.host = tcpListenHost;
     }
 
-    public void setSynchronized(boolean sync) {
-        this.sync = sync;
-    }
-
     public void setRxBufSize(int rxBufSize) {
         this.rxBufSize = rxBufSize;
     }
@@ -194,8 +177,6 @@
     
     public int getWorkerThreadOptions() {
         int options = 0;
-        if ( getSynchronized() ) options = options |OPTION_SYNCHRONIZED;
-        if ( getSendAck() ) options = options |OPTION_SEND_ACK;
         if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER;
         return options;
     }
@@ -213,10 +194,6 @@
         return this.port;
     }
 
-    public boolean isSync() {
-        return sync;
-    }
-
     public boolean getDirect() {
         return direct;
     }
@@ -228,11 +205,6 @@
     }
 
 
-    public boolean getSynchronized() {
-        return this.sync;
-    }
-
-
 
     public String getHost() {
         getBind();
@@ -289,14 +261,6 @@
 
     public void setPort(int port) {
         this.port = port;
-    }
-
-    public void setSync(boolean sync) {
-        this.sync = sync;
-    }
-
-    public void setSendAck(boolean sendAck) {
-        this.sendAck = sendAck;
     }
 
     public void setAutoBind(int autoBind) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java Fri Mar 17 12:57:04 2006
@@ -111,7 +111,7 @@
                 mutex.notify();
             }
         }else {
-            worker.doRun = false;
+            worker.setDoRun(false);
             synchronized (worker){worker.notify();}
         }
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java Fri Mar 17 12:57:04 2006
@@ -16,6 +16,8 @@
 
 package org.apache.catalina.tribes.tcp;
 
+import org.apache.catalina.tribes.io.ListenCallback;
+
 
 
 
@@ -23,18 +25,20 @@
  * @author Filip Hanik
  * @version $Revision: 366253 $ $Date: 2006-01-05 13:30:42 -0600 (Thu, 05 Jan 2006) $
  */
-public class WorkerThread extends Thread
+public abstract class WorkerThread extends Thread 
 {
     
-    public static final int OPTION_SEND_ACK = ReceiverBase.OPTION_SEND_ACK;
-    public static final int OPTION_SYNCHRONIZED = ReceiverBase.OPTION_SYNCHRONIZED;
     public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
     
-    
-    protected ThreadPool pool;
-    protected boolean doRun = true;
+    private ListenCallback callback;
+    private ThreadPool pool;
+    private boolean doRun = true;
     private int options;
 
+    public WorkerThread(ListenCallback callback) {
+        this.callback = callback;
+    }
+
     public void setPool(ThreadPool pool) {
         this.pool = pool;
     }
@@ -43,6 +47,14 @@
         this.options = options;
     }
 
+    public void setCallback(ListenCallback callback) {
+        this.callback = callback;
+    }
+
+    public void setDoRun(boolean doRun) {
+        this.doRun = doRun;
+    }
+
     public ThreadPool getPool() {
         return pool;
     }
@@ -51,21 +63,18 @@
         return options;
     }
 
+    public ListenCallback getCallback() {
+        return callback;
+    }
+
+    public boolean isDoRun() {
+        return doRun;
+    }
+
     public void close()
     {
         doRun = false;
         notify();
     }
     
-    public boolean sendAckSync() {
-        int options = getOptions();
-        return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
-               ((OPTION_SYNCHRONIZED & options) == OPTION_SYNCHRONIZED);
-    }
-
-    public boolean sendAckAsync() {
-        int options = getOptions();
-        return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
-               ((OPTION_SYNCHRONIZED & options) != OPTION_SYNCHRONIZED);
-    }    
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java Fri Mar 17 12:57:04 2006
@@ -75,7 +75,7 @@
     }
     
     protected BioReplicationThread getReplicationThread() {
-        BioReplicationThread result = new BioReplicationThread();
+        BioReplicationThread result = new BioReplicationThread(this);
         result.setOptions(getWorkerThreadOptions());
         return result;
     }
@@ -136,7 +136,7 @@
             if ( socket == null ) continue;
             socket.setReceiveBufferSize(getRxBufSize());
             socket.setSendBufferSize(getRxBufSize());
-            ObjectReader reader = new ObjectReader(socket,this);
+            ObjectReader reader = new ObjectReader(socket);
             thread.serviceSocket(socket,reader);
         }//while
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java Fri Mar 17 12:57:04 2006
@@ -24,6 +24,9 @@
 import java.io.InputStream;
 import org.apache.catalina.tribes.tcp.ReceiverBase;
 import java.io.OutputStream;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ClusterData;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -47,14 +50,14 @@
     protected Socket socket;
     protected ObjectReader reader;
     
-    public BioReplicationThread ()
-    {
+    public BioReplicationThread (ListenCallback callback) {
+        super(callback);
     }
 
     // loop forever waiting for work to do
     public synchronized void run()
     {
-        while (doRun) {
+        while (isDoRun()) {
             try {
                 // sleep and release object lock
                 this.wait();
@@ -77,8 +80,8 @@
                 }
             }
             // done, ready for more, return to pool
-            if ( this.pool != null ) this.pool.returnWorker (this);
-            else doRun = false;
+            if ( getPool() != null ) getPool().returnWorker (this);
+            else setDoRun(false);
         }
     }
 
@@ -91,31 +94,27 @@
     
     protected void execute(ObjectReader reader) throws Exception{
         int pkgcnt = reader.count();
-        /**
-         * Use send ack here if you want to ack the request to the remote 
-         * server before completing the request
-         * This is considered an asynchronized request
-         */
-        if (sendAckAsync()) {
-            while ( pkgcnt > 0 ) {
-                sendAck();
-                pkgcnt--;
-            }
+
+        if ( pkgcnt > 0 ) {
+            ChannelMessage[] msgs = reader.execute();
+            for ( int i=0; i<msgs.length; i++ ) {
+                /**
+                 * Use send ack here if you want to ack the request to the remote 
+                 * server before completing the request
+                 * This is considered an asynchronized request
+                 */
+                if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck();
+                //process the message
+                getCallback().messageDataReceived(msgs[i]);
+                /**
+                 * Use send ack here if you want the request to complete on this 
+                 * server before sending the ack to the remote server
+                 * This is considered a synchronized request
+                 */
+                if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck();
+            }                        
         }
-        //check to see if any data is available
-        pkgcnt = reader.execute();
 
-        /**
-         * Use send ack here if you want the request to complete on this 
-         * server before sending the ack to the remote server
-         * This is considered a synchronized request
-         */
-        if (sendAckSync()) {
-            while ( pkgcnt > 0 ) {
-                sendAck();
-                pkgcnt--;
-            }
-        }        
        
     }
 
@@ -161,7 +160,7 @@
     }
     
     public void close() {
-        doRun = false;
+        setDoRun(false);
         try {socket.close();}catch ( Exception ignore){}
         try {reader.close();}catch ( Exception ignore){}
         reader = null;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Fri Mar 17 12:57:04 2006
@@ -124,13 +124,13 @@
      * @see org.apache.catalina.tribes.tcp.IDataSender#sendMessage(,
      *      ChannelMessage)
      */
-    public  void sendMessage(byte[] data) throws IOException {
+    public  void sendMessage(byte[] data, boolean waitForAck) throws IOException {
         boolean messageTransfered = false ;
         IOException exception = null;
         setAttempt(0);
         try {
              // first try with existing connection
-             pushMessage(data,false);
+             pushMessage(data,false,waitForAck);
              messageTransfered = true ;
         } catch (IOException x) {
             SenderState.getSenderState(getDestination()).setSuspect();
@@ -140,7 +140,7 @@
                 try {
                     setAttempt(getAttempt()+1);
                     // second try with fresh connection
-                    pushMessage(data, true);
+                    pushMessage(data, true,waitForAck);
                     messageTransfered = true;
                     exception = null;
                 } catch (IOException xx) {
@@ -245,13 +245,13 @@
      * @since 5.5.10
      */
     
-    protected  void pushMessage(byte[] data, boolean reconnect) throws IOException {
+    protected  void pushMessage(byte[] data, boolean reconnect, boolean waitForAck) throws IOException {
         keepalive();
         if ( reconnect ) closeSocket();
         if (!isConnected()) openSocket();
         soOut.write(data);
         soOut.flush();
-        if (getWaitForAck()) waitForAck();
+        if (waitForAck) waitForAck();
         SenderState.getSenderState(getDestination()).setReady();
 
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Fri Mar 17 12:57:04 2006
@@ -11,6 +11,7 @@
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.tcp.MultiPointSender;
 import org.apache.catalina.tribes.tcp.AbstractSender;
+import org.apache.catalina.tribes.Channel;
 
 /**
  * <p>Title: </p>
@@ -39,7 +40,7 @@
         ChannelException cx = null;
         for ( int i=0; i<senders.length; i++ ) {
             try {
-                senders[i].sendMessage(data);
+                senders[i].sendMessage(data,(msg.getOptions()&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK);
             } catch (Exception x) {
                 if (cx == null) cx = new ChannelException(x);
                 cx.addFaultyMember(destination[i]);
@@ -65,7 +66,6 @@
                     sender.setKeepAliveTime(getKeepAliveTime());
                     bioSenders.put(destination[i], sender);
                 }
-                sender.setWaitForAck(getWaitForAck());
                 result[i] = sender;
                 if (!result[i].isConnected() ) result[i].connect();
                 result[i].keepalive();

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java Fri Mar 17 12:57:04 2006
@@ -52,7 +52,6 @@
     public DataSender getNewDataSender() {
         MultipointBioSender sender = new MultipointBioSender();
         sender.setTimeout(getTimeout());
-        sender.setWaitForAck(getWaitForAck());
         sender.setMaxRetryAttempts(getMaxRetryAttempts());
         sender.setRxBufSize(getRxBufSize());
         sender.setTxBufSize(getTxBufSize());

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Fri Mar 17 12:57:04 2006
@@ -89,7 +89,7 @@
         try {
             NioReplicationThread[] receivers = new NioReplicationThread[getTcpThreadCount()];
             for ( int i=0; i<receivers.length; i++ ) {
-                receivers[i] = new NioReplicationThread();
+                receivers[i] = new NioReplicationThread(this);
                 receivers[i].setRxBufSize(getRxBufSize());
                 receivers[i].setOptions(getWorkerThreadOptions());
             }
@@ -173,7 +173,7 @@
 
                         channel.socket().setReceiveBufferSize(getRxBufSize());
                         channel.socket().setSendBufferSize(getTxBufSize());
-                        Object attach = new ObjectReader(channel, selector,this);
+                        Object attach = new ObjectReader(channel);
                         registerChannel(selector,
                                         channel,
                                         SelectionKey.OP_READ,

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReplicationThread.java Fri Mar 17 12:57:04 2006
@@ -23,6 +23,9 @@
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.tcp.Constants;
 import org.apache.catalina.tribes.tcp.WorkerThread;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ClusterData;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -44,8 +47,9 @@
     private ByteBuffer buffer = null;
     private SelectionKey key;
     private int rxBufSize;
-    public NioReplicationThread ()
+    public NioReplicationThread (ListenCallback callback)
     {
+        super(callback);
     }
 
     // loop forever waiting for work to do
@@ -56,7 +60,7 @@
         }else {
             buffer = ByteBuffer.allocate (getRxBufSize());
         }
-        while (doRun) {
+        while (isDoRun()) {
             try {
                 // sleep and release object lock
                 this.wait();
@@ -93,7 +97,7 @@
             }
             key = null;
             // done, ready for more, return to pool
-            this.pool.returnWorker (this);
+            getPool().returnWorker (this);
         }
     }
 
@@ -142,38 +146,27 @@
         
         int pkgcnt = reader.count();
 
-        
-
-        /**
-         * Use send ack here if you want to ack the request to the remote 
-         * server before completing the request
-         * This is considered an asynchronized request
-         */
-        if (sendAckAsync()) {
-            while ( pkgcnt > 0 ) {
-                sendAck(key,channel);
-                pkgcnt--;
-            }
-        }
-
-        //check to see if any data is available
-        pkgcnt = reader.execute();
-
-        if (log.isTraceEnabled()) {
-            log.trace("sending " + pkgcnt + " ack packages to " + channel.socket().getLocalPort() );
+        if ( pkgcnt > 0 ) {
+            ChannelMessage[] msgs = reader.execute();
+            for ( int i=0; i<msgs.length; i++ ) {
+                /**
+                 * Use send ack here if you want to ack the request to the remote 
+                 * server before completing the request
+                 * This is considered an asynchronized request
+                 */
+                if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel);
+                //process the message
+                getCallback().messageDataReceived(msgs[i]);
+                /**
+                 * Use send ack here if you want the request to complete on this 
+                 * server before sending the ack to the remote server
+                 * This is considered a synchronized request
+                 */
+                if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel);
+            }                        
         }
 
-        /**
-         * Use send ack here if you want the request to complete on this 
-         * server before sending the ack to the remote server
-         * This is considered a synchronized request
-         */
-        if (sendAckSync()) {
-            while ( pkgcnt > 0 ) {
-                sendAck(key,channel);
-                pkgcnt--;
-            }
-        }        
+        
 
         
         if (count < 0) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Fri Mar 17 12:57:04 2006
@@ -75,7 +75,7 @@
      * @return boolean
      * @throws IOException
      */
-    public boolean process(SelectionKey key) throws IOException {
+    public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
         int ops = key.readyOps();
         key.interestOps(key.interestOps() & ~ops);
         if ( key.isConnectable() ) {
@@ -98,7 +98,7 @@
             boolean writecomplete = write(key);
             if ( writecomplete ) {
                 //we are completed, should we read an ack?
-                if ( getWaitForAck() ) {
+                if ( waitForAck ) {
                     //register to read the ack
                     key.interestOps(key.interestOps() | SelectionKey.OP_READ);
                 } else {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Fri Mar 17 12:57:04 2006
@@ -32,6 +32,7 @@
 import org.apache.catalina.tribes.tcp.SenderState;
 import org.apache.catalina.tribes.tcp.AbstractSender;
 import java.net.UnknownHostException;
+import org.apache.catalina.tribes.Channel;
 
 /**
  * <p>Title: </p>
@@ -70,7 +71,7 @@
             //loop until complete, an error happens, or we timeout
             long delta = System.currentTimeMillis() - start;
             while ( (remaining>0) && (delta<getTimeout()) ) {
-                remaining -= doLoop(selectTimeout,getMaxRetryAttempts());
+                remaining -= doLoop(selectTimeout,getMaxRetryAttempts(),(Channel.SEND_OPTIONS_USE_ACK&msg.getOptions())==Channel.SEND_OPTIONS_USE_ACK);
                 delta = System.currentTimeMillis() - start;
             }
             if ( remaining > 0 ) {
@@ -89,7 +90,7 @@
         
     }
     
-    private int doLoop(long selectTimeOut, int maxAttempts) throws IOException, ChannelException {
+    private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck) throws IOException, ChannelException {
         int completed = 0;
         int selectedKeys = selector.select(selectTimeOut);
         
@@ -105,7 +106,7 @@
             sk.interestOps(sk.interestOps() & ~readyOps);
             NioSender sender = (NioSender) sk.attachment();
             try {
-                if (sender.process(sk)) {
+                if (sender.process(sk,waitForAck)) {
                     sender.reset();
                     completed++;
                     sender.setComplete(true);
@@ -186,7 +187,6 @@
                 sender.setDirectBuffer(getDirectBuffer());
                 sender.setRxBufSize(getRxBufSize());
                 sender.setTxBufSize(getTxBufSize());
-                sender.setWaitForAck(getWaitForAck());
                 sender.setTimeout(getTimeout());
                 sender.setKeepAliveCount(getKeepAliveCount());
                 sender.setKeepAliveTime(getKeepAliveTime());

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Fri Mar 17 12:57:04 2006
@@ -56,7 +56,6 @@
         try {
             ParallelNioSender sender = new ParallelNioSender();
             sender.setTimeout(getTimeout());
-            sender.setWaitForAck(getWaitForAck());
             sender.setMaxRetryAttempts(getMaxRetryAttempts()); 
             sender.setDirectBuffer(getDirectBuffer());
             sender.setRxBufSize(getRxBufSize());

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Fri Mar 17 12:57:04 2006
@@ -135,7 +135,7 @@
             //send out a map membership message, only wait for the first reply
             MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_START,
                                             false, null, null, null, wrap(channel.getLocalMember(false)));
-            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, timeout);
+            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, timeout);
             for (int i = 0; i < resp.length; i++) {
                 messageReceived(resp[i].getMessage(), resp[i].getSource());
             }
@@ -156,7 +156,7 @@
             //send a map membership stop message
             MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_STOP,
                                             false, null, null, null, wrap(channel.getLocalMember(false)));
-            if (channel != null) channel.send(channel.getMembers(), msg);
+            if (channel != null) channel.send(channel.getMembers(), msg,channel.SEND_OPTIONS_DEFAULT);
         } catch (ChannelException x) {
             log.warn("Unable to send stop message.", x);
         }
@@ -225,7 +225,7 @@
 
             }
             try {
-                channel.send(entry.getBackupNodes(), msg);
+                channel.send(entry.getBackupNodes(), msg,channel.SEND_OPTIONS_DEFAULT);
             } catch (ChannelException x) {
                 log.error("Unable to replicate data.", x);
             }
@@ -255,7 +255,7 @@
             if (backup != null) {
                 MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
                                                 null, null, null, null);
-                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, rpcTimeout);
+                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, rpcTimeout);
                 if (resp.length > 0) {
                     msg = (MapMessage) resp[0].getMessage();
                     ArrayList list = (ArrayList) msg.getValue();

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Mar 17 12:57:04 2006
@@ -132,12 +132,12 @@
         //publish the data out to all nodes
         MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
                                         (Serializable) key, null, null, wrap(backup));
-        getChannel().send(getMapMembers(), msg);
+        getChannel().send(getMapMembers(), msg,Channel.SEND_OPTIONS_DEFAULT);
 
         //publish the backup data to one node
         msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
                              (Serializable) key, (Serializable) value, null, wrap(backup));
-        getChannel().send(new Member[] {backup}, msg);
+        getChannel().send(new Member[] {backup}, msg, Channel.SEND_OPTIONS_DEFAULT);
         return wrap(backup);
     }
     
@@ -149,7 +149,7 @@
             try {
                 MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
                                                 (Serializable) key, null, null, null);
-                Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, getRpcTimeout());
+                Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
                 if (resp == null || resp.length == 0) {
                     //no responses
                     log.warn("Unable to retrieve remote object for key:" + key);
@@ -165,7 +165,7 @@
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
                     msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
-                    getChannel().send(backup,msg);
+                    getChannel().send(backup,msg,Channel.SEND_OPTIONS_DEFAULT);
                 }
 
                 entry.setBackupNodes(backup);
@@ -239,7 +239,7 @@
         MapEntry entry = (MapEntry)super.remove(key);
         MapMessage msg = new MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
         try {
-            getChannel().send(getMapMembers(), msg);
+            getChannel().send(getMapMembers(), msg,Channel.SEND_OPTIONS_DEFAULT);
         } catch ( ChannelException x ) {
             log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Fri Mar 17 12:57:04 2006
@@ -112,7 +112,7 @@
         MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
                                         (Serializable) key, null, null, backup);
 
-        getChannel().send(getMapMembers(), msg);
+        getChannel().send(getMapMembers(), msg, Channel.SEND_OPTIONS_DEFAULT);
 
         return backup;
     }
@@ -182,7 +182,7 @@
         MapEntry entry = (MapEntry)super.remove(key);
         MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
         try {
-            getChannel().send(getMapMembers(), msg);
+            getChannel().send(getMapMembers(), msg, Channel.SEND_OPTIONS_DEFAULT);
         } catch (ChannelException x) {
             log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x);
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Fri Mar 17 12:57:04 2006
@@ -74,17 +74,18 @@
      */
     public Response[] send(Member[] destination, 
                            Serializable message,
-                           int options, 
+                           int rpcOptions, 
+                           int channelOptions,
                            long timeout) throws ChannelException {
         
         if ( destination==null || destination.length == 0 ) return new Response[0];
         RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
-        RpcCollector collector = new RpcCollector(key,options,destination.length,timeout);
+        RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout);
         try {
             synchronized (collector) {
                 responseMap.put(key, collector);
                 RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
-                channel.send(destination, rmsg);
+                channel.send(destination, rmsg, channelOptions);
                 collector.wait(timeout);
             }
         } catch ( InterruptedException ix ) {
@@ -119,7 +120,7 @@
             rmsg.reply = true;
             rmsg.message = reply;
             try {
-                channel.send(new Member[] {sender}, rmsg);
+                channel.send(new Member[] {sender}, rmsg,Channel.SEND_OPTIONS_DEFAULT);
             }catch ( Exception x )  {
                 log.error("Unable to send back reply in RpcChannel.",x);
             }

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Fri Mar 17 12:57:04 2006
@@ -50,9 +50,8 @@
            .append("\n\t\t[-tcpselto tcpselectortimeout]") 
            .append("\n\t\t[-tcpthreads tcpthreadcount]") 
            .append("\n\t\t[-port tcplistenport]")
-           .append("\n\t\t[-ack true|false]")
+           .append("\n\t\t[-autobind tcpbindtryrange]")
            .append("\n\t\t[-ackto acktimeout]") 
-           .append("\n\t\t[-sync true|false]")
            .append("\n\t\t[-receiver org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]")
            .append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultiSender]")
            .append("\n\t\t[-transport.xxx transport specific property]")
@@ -74,8 +73,6 @@
         String bind = "auto";
         int port = 4001;
         String mbind = null;
-        boolean ack = false;
-        boolean sync = false;
         boolean gzip = false;
         int tcpseltimeout = 100;
         int tcpthreadcount = 4;
@@ -88,6 +85,7 @@
         int ordersize = Integer.MAX_VALUE;
         boolean frag = false;
         int fragsize = 1024;
+        int autoBind = 10;
         Properties transportProperties = new Properties();
         String transport = "org.apache.catalina.tribes.tcp.nio.PooledParallelSender";
         String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver";
@@ -97,6 +95,8 @@
                 bind = args[++i];
             } else if ("-port".equals(args[i])) {
                 port = Integer.parseInt(args[++i]);
+            } else if ("-autobind".equals(args[i])) {
+                autoBind = Integer.parseInt(args[++i]);
             } else if ("-tcpselto".equals(args[i])) {
                 tcpseltimeout = Integer.parseInt(args[++i]);
             } else if ("-tcpthreads".equals(args[i])) {
@@ -113,12 +113,8 @@
             } else if ("-fragsize".equals(args[i])) {
                 fragsize = Integer.parseInt(args[++i]);
                 System.out.println("Setting FragmentationInterceptor.maxSize="+fragsize);
-            } else if ("-ack".equals(args[i])) {
-                ack = Boolean.parseBoolean(args[++i]);
             } else if ("-ackto".equals(args[i])) {
                 acktimeout = Integer.parseInt(args[++i]);
-            } else if ("-sync".equals(args[i])) {
-                sync = Boolean.parseBoolean(args[++i]);
             } else if ("-transport".equals(args[i])) {
                 transport = args[++i];
             } else if (args[i]!=null && args[i].startsWith("transport.")) {
@@ -148,17 +144,15 @@
         rx.setTcpSelectorTimeout(tcpseltimeout);
         rx.setTcpThreadCount(tcpthreadcount);
         rx.getBind();
-        rx.setSendAck(ack);
-        rx.setSynchronized(sync);
         rx.setRxBufSize(43800);
         rx.setTxBufSize(25188);
+        rx.setAutoBind(autoBind);
 
         
         ReplicationTransmitter ps = new ReplicationTransmitter();
         System.out.println("Creating transport class="+transport);
         MultiPointSender sender = (MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance();
         sender.setTimeout(acktimeout);
-        sender.setWaitForAck(ack);
         sender.setMaxRetryAttempts(2);
         sender.setRxBufSize(43800);
         sender.setTxBufSize(25188);

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Fri Mar 17 12:57:04 2006
@@ -77,7 +77,7 @@
             try {
                 System.out.println("Sending ["+msg+"]");
                 long start = System.currentTimeMillis();
-                Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,timeout);
+                Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout);
                 System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms.");
                 for ( int i=0; i<resp.length; i++ ) {
                     System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]");

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Fri Mar 17 12:57:04 2006
@@ -25,6 +25,7 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Channel;
 
 
 /**
@@ -55,6 +56,7 @@
     public long pause = 0;
     public boolean breakonChannelException = false;
     public long receiveStart = 0;
+    public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
     
     static int messageSize = 0;
     
@@ -136,7 +138,7 @@
                         if (debug) {
                             printArray(msg.getMessage());
                         }
-                        channel.send(channel.getMembers(), msg);
+                        channel.send(channel.getMembers(), msg, channelOptions);
                         if ( pause > 0 ) {
                             if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
                             Thread.sleep(pause);
@@ -307,6 +309,7 @@
                            "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
                            "[-threads numberofsenderthreads]  \n\t\t"+
                            "[-size messagesize]  \n\t\t"+
+                           "[-sendoptions channeloptions]  \n\t\t"+
                            "[-break (halts execution on exception)]\n"+
                            "\tChannel options:"+
                            ChannelCreator.usage()+"\n\n"+
@@ -324,6 +327,7 @@
         int stats = 10000;
         boolean breakOnEx = false;
         int threads = 1;
+        int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
         if ( args.length == 0 ) {
             args = new String[] {"-help"};
         }
@@ -340,6 +344,9 @@
             } else if ("-stats".equals(args[i])) {
                 stats = Integer.parseInt(args[++i]);
                 System.out.println("Stats every "+stats+" message");
+            } else if ("-sendoptions".equals(args[i])) {
+                channelOptions = Integer.parseInt(args[++i]);
+                System.out.println("Setting send options to "+channelOptions);
             } else if ("-size".equals(args[i])) {
                 size = Integer.parseInt(args[++i])-4;
                 System.out.println("Message size will be:"+(size+4)+" bytes");
@@ -358,6 +365,7 @@
         ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
         
         LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+        test.channelOptions = channelOptions;
         LoadMessage msg = new LoadMessage();
         
         messageSize = LoadMessage.getMessageSize(msg);

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java Fri Mar 17 12:57:04 2006
@@ -9,6 +9,7 @@
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.Channel;
 
 /**
  * <p>Title: </p>
@@ -26,6 +27,7 @@
     private Selector selector = null;
     private int counter = 0;
     MemberImpl mbr;
+    private static int testOptions = Channel.SEND_OPTIONS_DEFAULT;
     public TestNioSender()  {
         
     }
@@ -39,6 +41,7 @@
         ClusterData data = new ClusterData(true);
         data.setMessage(new XByteBuffer(msg.getBytes(),false));
         data.setAddress(mbr);
+        
         return data;
     }
 
@@ -46,7 +49,6 @@
         selector = Selector.open();
         mbr = new MemberImpl("","localhost",4444,0);
         NioSender sender = new NioSender(mbr);
-        sender.setWaitForAck(false);
         sender.setDirectBuffer(true);
         sender.setSelector(selector);
         sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
@@ -79,7 +81,7 @@
                     int readyOps = sk.readyOps();
                     sk.interestOps(sk.interestOps() & ~readyOps);
                     NioSender sender = (NioSender) sk.attachment();
-                    if ( sender.process(sk) ) {
+                    if ( sender.process(sk, (testOptions&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK) ) {
                         System.out.println("Message completed for handler:"+sender);
                         Thread.currentThread().sleep(2000);
                         sender.reset();

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=386711&r1=386710&r2=386711&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri Mar 17 12:57:04 2006
@@ -8,6 +8,9 @@
 Code Tasks:
 ===========================================
 
+22. sendAck and synchronized should not have to be a XML config,
+    it can be configured on a per packet basis using ClusterData.getOptions()
+
 21. Implement a WAN membership layer, using a WANMbrInterceptor and a 
     WAN Router/Forwarder (Tipi on top of a ManagedChannel)
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message