tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r398970 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/ src/share/org/apache/catalina/tribes/membership/ src/share/org/apache/catalina/tribes/transport/ src/share/org/apache/catalina/tribes/transpor...
Date Tue, 02 May 2006 17:28:14 GMT
Author: fhanik
Date: Tue May  2 10:28:06 2006
New Revision: 398970

URL: http://svn.apache.org/viewcvs?rev=398970&view=rev
Log:
Fix shutdown message, wasnt getting broadcasted since it caches the serialized data for speed
purposes
Started implementing the fail ack command so that we can have a acknowledgement if the message
was handled successfully or not
Refactored channel exception so that we can track what exact exception a member send was exposed
to

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelException.java
Tue May  2 10:28:06 2006
@@ -57,29 +57,45 @@
         return buf.toString();
     }
     
-    public void addFaultyMember(Member[] mbrs) {
+    public void addFaultyMember(Member mbr, Exception x ) {
+        addFaultyMember(new FaultyMember(mbr,x));
+    }
+    
+    public void addFaultyMember(FaultyMember[] mbrs) {
         for (int i=0; mbrs!=null && i<mbrs.length; i++ ) {
             addFaultyMember(mbrs[i]);
         }
     }
 
-    public void addFaultyMember(Member mbr) {
+    public void addFaultyMember(FaultyMember mbr) {
         if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
         faultyMembers.add(mbr);
     }
 
-    public void setFaultyMembers(ArrayList faultyMembers) {
-        this.faultyMembers = faultyMembers;
+    public FaultyMember[] getFaultyMembers() {
+        if ( this.faultyMembers==null ) return new FaultyMember[0];
+        return (FaultyMember[])faultyMembers.toArray(new FaultyMember[faultyMembers.size()]);
     }
-
-    public void setFaultyMembers(Member[] faultyMembers) {
-        if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
-        this.faultyMembers.addAll(Arrays.asList(faultyMembers));
-    }
-
-    public Member[] getFaultyMembers() {
-        if ( this.faultyMembers==null ) return new Member[0];
-        return (Member[])faultyMembers.toArray(new Member[faultyMembers.size()]);
+    
+    public static class FaultyMember {
+        protected Exception cause;
+        protected Member member;
+        public FaultyMember(Member mbr, Exception x) { 
+            this.member = mbr;
+            this.cause = x;
+        }
+        
+        public Member getMember() {
+            return member;
+        }
+        
+        public Exception getCause() {
+            return cause;
+        }
+        
+        public String toString() {
+            return "FaultyMember:"+member.toString();
+        }
     }
 
 }

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java?rev=398970&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java
(added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/RemoteProcessException.java
Tue May  2 10:28:06 2006
@@ -0,0 +1,46 @@
+/*
+ * Copyright 1999,2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.catalina.tribes;
+
+/**
+ * <p>Title: RemoteProcessException</p>
+ *
+ * <p>Description: Message thrown by a sender when USE_SYNC_ACK receives a FAIL_ACK_COMMAND.<br>
+ * This means that the message was received on the remote node but the processing of the
message failed.
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class RemoteProcessException
+    extends RuntimeException {
+    public RemoteProcessException() {
+        super();
+    }
+
+    public RemoteProcessException(String message) {
+        super(message);
+    }
+
+    public RemoteProcessException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RemoteProcessException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
Tue May  2 10:28:06 2006
@@ -214,9 +214,12 @@
         //send a stop message
         byte[] payload = member.getPayload();
         member.setPayload(STOP_PAYLOAD);
+        member.getData(true,true);
         send();
         //restore payload
         member.setPayload(payload);
+        member.getData(true,true);
+        //leave mcast group
         socket.leaveGroup(address);
         serviceStartTime = Long.MAX_VALUE;
     }
@@ -236,7 +239,7 @@
                 log.debug("Mcast receive ping from member " + m);
 
             if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) {
-                if (log.isInfoEnabled()) log.info("Member has shutdown:" + m);
+                if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
                 membership.removeMcastMember(m);
                 service.memberDisappeared(m);
             } else if (membership.memberAlive(m)) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
Tue May  2 10:28:06 2006
@@ -173,6 +173,11 @@
      * @return byte[]
      */
     public byte[] getData(boolean getalive)  {
+        return getData(getalive,false);
+    }
+
+    public byte[] getData(boolean getalive, boolean reset)  {
+        if ( reset ) dataPkg = null;
         //look in cache first
         if ( dataPkg!=null ) {
             if ( getalive ) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/Constants.java
Tue May  2 10:28:06 2006
@@ -22,7 +22,7 @@
 /**
  * Manifest constants for the <code>org.apache.catalina.tribes.transport</code>
  * package.
- *
+ * @author Filip Hanik
  * @author Peter Rossbach
  * @version $Revision: 303753 $ $Date: 2005-03-14 15:24:30 -0600 (Mon, 14 Mar 2005) $
  */
@@ -31,7 +31,12 @@
 
     public static final String Package = "org.apache.catalina.tribes.transport";
     
+    /*
+     * Do not change any of these values!
+     */
     public static final byte[] ACK_DATA = new byte[] {6, 2, 3};
+    public static final byte[] FAIL_ACK_DATA = new byte[] {11, 0, 5};
     public static final byte[] ACK_COMMAND = XByteBuffer.createDataPackage(ACK_DATA);
+    public static final byte[] FAIL_ACK_COMMAND = XByteBuffer.createDataPackage(FAIL_ACK_DATA);
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
Tue May  2 10:28:06 2006
@@ -102,15 +102,20 @@
                  * server before completing the request
                  * This is considered an asynchronized request
                  */
-                if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck();
-                //process the message
-                getCallback().messageDataReceived(msgs[i]);
-                /**
-                 * Use send ack here if you want the request to complete on this 
-                 * server before sending the ack to the remote server
-                 * This is considered a synchronized request
-                 */
-                if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck();
+                if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+                try {
+                    //process the message
+                    getCallback().messageDataReceived(msgs[i]);
+                    /**
+                     * Use send ack here if you want the request to complete on this
+                     * server before sending the ack to the remote server
+                     * This is considered a synchronized request
+                     */
+                    if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+                }catch  ( Exception x ) {
+                    if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
+                    log.error("Error thrown from messageDataReceived.",x);
+                }
             }                        
         }
 
@@ -145,10 +150,10 @@
      * @param key
      * @param channel
      */
-    protected void sendAck() {
+    protected void sendAck(byte[] command) {
         try {
             OutputStream out = socket.getOutputStream();
-            out.write(Constants.ACK_COMMAND);
+            out.write(command);
             out.flush();
             if (log.isTraceEnabled()) {
                 log.trace("ACK sent to " + socket.getPort());

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
Tue May  2 10:28:06 2006
@@ -32,6 +32,7 @@
 import org.apache.catalina.tribes.transport.SenderState;
 import org.apache.catalina.tribes.util.StringManager;
 import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.RemoteProcessException;
 
 /**
  * Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -272,6 +273,7 @@
     protected  void waitForAck() throws java.io.IOException {
         try {
             boolean ackReceived = false;
+            boolean failAckReceived = false;
             ackbuf.clear();
             int bytesRead = 0;
             int i = soIn.read();
@@ -280,7 +282,10 @@
                 byte d = (byte)i;
                 ackbuf.append(d);
                 if (ackbuf.doesPackageExist() ) {
-                    ackReceived = Arrays.equals(ackbuf.extractDataPackage(true),Constants.ACK_DATA);
+                    byte[] ackcmd = ackbuf.extractDataPackage(true);
+                    ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
+                    failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
+                    ackReceived = ackReceived || failAckReceived;
                     break;
                 }
                 i = soIn.read();
@@ -288,6 +293,8 @@
             if (!ackReceived) {
                 if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(),
new Integer(socket.getLocalPort())));
                 else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(),
new Integer(socket.getLocalPort())));
+            } else if ( failAckReceived ) {
+                throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
             }
         } catch (IOException x) {
             String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()),
new Long(getTimeout()));

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
Tue May  2 10:28:06 2006
@@ -34,7 +34,6 @@
     private boolean autoConnect;
 
     public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws
ChannelException {
-        long start = System.currentTimeMillis();
         byte[] data = XByteBuffer.createDataPackage((ClusterData)msg);
         BioSender[] senders = setupForSend(destination);
         ChannelException cx = null;
@@ -43,7 +42,7 @@
                 senders[i].sendMessage(data,(msg.getOptions()&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK);
             } catch (Exception x) {
                 if (cx == null) cx = new ChannelException(x);
-                cx.addFaultyMember(destination[i]);
+                cx.addFaultyMember(destination[i],x);
             }
         }
         if (cx!=null ) throw cx;
@@ -71,7 +70,7 @@
                 result[i].keepalive();
             }catch (Exception x ) {
                 if ( cx== null ) cx = new ChannelException(x);
-                cx.addFaultyMember(destination[i]);
+                cx.addFaultyMember(destination[i],x);
             }
         }
         if ( cx!=null ) throw cx;
@@ -94,7 +93,7 @@
                 sender.disconnect();
             }catch ( Exception e ) {
                 if ( x == null ) x = new ChannelException(e);
-                x.addFaultyMember(mbr);
+                x.addFaultyMember(mbr,e);
             }
             bioSenders.remove(mbr);
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
Tue May  2 10:28:06 2006
@@ -31,7 +31,7 @@
             sender = (MultiPointSender)getSender();
             if (sender == null) {
                 ChannelException cx = new ChannelException("Unable to retrieve a data sender,
time out error.");
-                for (int i = 0; i < destination.length; i++) cx.addFaultyMember(destination[i]);
+                for (int i = 0; i < destination.length; i++) cx.addFaultyMember(destination[i],
new NullPointerException("Unable to retrieve a sender from the sender pool"));
                 throw cx;
             } else {
                 sender.sendMessage(destination, msg);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
Tue May  2 10:28:06 2006
@@ -152,20 +152,20 @@
                  * server before completing the request
                  * This is considered an asynchronized request
                  */
-                if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel);
+                if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
                 try {
                     //process the message
                     getCallback().messageDataReceived(msgs[i]);
+                    /**
+                     * Use send ack here if you want the request to complete on this 
+                     * server before sending the ack to the remote server
+                     * This is considered a synchronized request
+                     */
+                    if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
                 }catch ( Exception e ) {
                     log.error("Processing of cluster message failed.",e);
+                    if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
                 } 
-                /**
-                 * Use send ack here if you want the request to complete on this 
-                 * server before sending the ack to the remote server
-                 * This is considered a synchronized request
-                 */
-                
-                if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel);
             }                        
         }
 
@@ -200,10 +200,10 @@
      * @param key
      * @param channel
      */
-    protected void sendAck(SelectionKey key, SocketChannel channel) {
+    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
         
         try {
-            channel.write(ByteBuffer.wrap(Constants.ACK_COMMAND));
+            channel.write(ByteBuffer.wrap(command));
             if (log.isTraceEnabled()) {
                 log.trace("ACK sent to " + channel.socket().getPort());
             }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
Tue May  2 10:28:06 2006
@@ -29,6 +29,7 @@
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.transport.AbstractSender;
 import org.apache.catalina.tribes.transport.DataSender;
+import org.apache.catalina.tribes.RemoteProcessException;
 
 /**
  * This class is NOT thread safe and should never be used with more than one thread at a
time
@@ -156,8 +157,11 @@
         ackbuf.append(readbuf,read);
         readbuf.clear();
         if (ackbuf.doesPackageExist() ) {
-            boolean result = Arrays.equals(ackbuf.extractDataPackage(true),org.apache.catalina.tribes.transport.Constants.ACK_DATA);
-            return result;
+            byte[] ackcmd = ackbuf.extractDataPackage(true);
+            boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
+            boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
+            if ( fack ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
+            return ack || fack;
         } else {
             return false;
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
Tue May  2 10:28:06 2006
@@ -93,7 +93,7 @@
                 //timeout has occured
                 cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
                 for (int i=0; i<senders.length; i++ ) {
-                    if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination());
+                    if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination(),null);
                 }
                 throw cx;
             }
@@ -145,7 +145,7 @@
                 if ( !isConnected() ) {
                     log.warn("Not retrying send for:" + sender.getDestination().getName()
+ "; Sender is disconnected.");
                     ChannelException cx = new ChannelException("Send failed, and sender is
disconnected. Not retrying.",x);
-                    cx.addFaultyMember(sender.getDestination());
+                    cx.addFaultyMember(sender.getDestination(),x);
                     throw cx;
                 }
                 
@@ -161,7 +161,7 @@
                     }
                 } else {
                     ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+"
max:"+maxAttempts,x);
-                    cx.addFaultyMember(sender.getDestination());
+                    cx.addFaultyMember(sender.getDestination(),x);
                     throw cx;
                 }//end if
             }
@@ -177,7 +177,7 @@
                 if (!senders[i].isConnected()) senders[i].connect();
             }catch ( IOException io ) {
                 if ( x==null ) x = new ChannelException(io);
-                x.addFaultyMember(senders[i].getDestination());
+                x.addFaultyMember(senders[i].getDestination(),io);
             }
         }
         if ( x != null ) throw x;
@@ -190,7 +190,7 @@
                 senders[i].setMessage(data);
             }catch ( IOException io ) {
                 if ( x==null ) x = new ChannelException(io);
-                x.addFaultyMember(senders[i].getDestination());
+                x.addFaultyMember(senders[i].getDestination(),io);
             }
         }
         if ( x != null ) throw x;
@@ -208,7 +208,7 @@
                     nioSenders.put(destination[i], sender);
                 }catch ( UnknownHostException x ) {
                     if ( cx == null ) cx = new ChannelException("Unable to setup NioSender.",x);
-                    cx.addFaultyMember(destination[i]);
+                    cx.addFaultyMember(destination[i],x);
                 }
             }
             if ( sender != null ) {
@@ -243,7 +243,7 @@
                 sender.disconnect();
             }catch ( Exception e ) {
                 if ( x == null ) x = new ChannelException(e);
-                x.addFaultyMember(mbr);
+                x.addFaultyMember(mbr,e);
             }
             nioSenders.remove(mbr);
         }

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
Tue May  2 10:28:06 2006
@@ -149,7 +149,7 @@
                     } catch (ChannelException x) {
                         if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
                         log.error("Unable to send message:"+x.getMessage());
-                        Member[] faulty = x.getFaultyMembers();
+                        ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
                         for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
                         --counter;
                         if ( this.breakonChannelException ) throw x;
@@ -318,6 +318,7 @@
                            "[-size messagesize]  \n\t\t"+
                            "[-sendoptions channeloptions]  \n\t\t"+
                            "[-break (halts execution on exception)]\n"+
+                           "[-shutdown (issues a channel.stop() command after send is completed)]\n"+
                            "\tChannel options:"+
                            ChannelCreator.usage()+"\n\n"+
                            "Example:\n\t"+
@@ -334,6 +335,7 @@
         int stats = 10000;
         boolean breakOnEx = false;
         int threads = 1;
+        boolean shutdown = false;
         int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
         if ( args.length == 0 ) {
             args = new String[] {"-help"};
@@ -348,6 +350,8 @@
                 pause = Long.parseLong(args[++i])*1000;
             } else if ("-break".equals(args[i])) {
                 breakOnEx = true;
+            } else if ("-shutdown".equals(args[i])) {
+                shutdown = true;
             } else if ("-stats".equals(args[i])) {
                 stats = Integer.parseInt(args[++i]);
                 System.out.println("Stats every "+stats+" message");
@@ -388,7 +392,7 @@
             test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
         }
         test.run();
-        
+        if ( shutdown && send ) channel.stop(channel.DEFAULT);
         System.out.println("System test complete, sleeping to let threads finish.");
         Thread.sleep(60*1000*60);
     } 

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=398970&r1=398969&r2=398970&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue May  2 10:28:06 2006
@@ -48,8 +48,6 @@
 
 41. Build a tipi that is a soft membership
 
-40. channel.stop() - should broadcast a stop message, to avoid timeout
-
 38. Make the AbstractReplicatedMap accept non serializable elements, but just don't replicate
them
 
 36. UDP Sender and Receiver, initially without flow control and guaranteed delivery.
@@ -251,4 +249,7 @@
 and no one accepts it, then it can reply immediately. this way the rpc sender doesn't have
to time out.
 
 39. Support for IPv6
-Notes: Completed. The membership now carries a variable length host address to support IPv6
\ No newline at end of file
+Notes: Completed. The membership now carries a variable length host address to support IPv6
+
+40. channel.stop() - should broadcast a stop message, to avoid timeout
+Notes: Completed. 
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message