Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 93315 invoked from network); 8 Feb 2011 20:18:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 8 Feb 2011 20:18:44 -0000 Received: (qmail 94052 invoked by uid 500); 8 Feb 2011 20:18:43 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 93947 invoked by uid 500); 8 Feb 2011 20:18:43 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 93938 invoked by uid 99); 8 Feb 2011 20:18:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Feb 2011 20:18:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Feb 2011 20:18:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BBDB2238889B; Tue, 8 Feb 2011 20:18:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1068549 - in /tomcat/trunk/java/org/apache/catalina/tribes/group: ExtendedRpcCallback.java RpcChannel.java Date: Tue, 08 Feb 2011 20:18:19 -0000 To: dev@tomcat.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110208201819.BBDB2238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fhanik Date: Tue Feb 8 20:18:19 2011 New Revision: 1068549 URL: http://svn.apache.org/viewvc?rev=1068549&view=rev Log: https://issues.apache.org/bugzilla/show_bug.cgi?id=50667 Allow a replier to get confirmation if the reply message was sent successfully or if it failed Added: tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java Added: tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java?rev=1068549&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java (added) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java Tue Feb 8 20:18:19 2011 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group; + +import java.io.Serializable; + +import org.apache.catalina.tribes.ErrorHandler; +import org.apache.catalina.tribes.Member; +/** + * Extension to the {@link RpcCallback} interface. Allows a RPC messenger to get a confirmation if the reply + * was sent successfully to the original sender. + * @author fhanik + * + */ +public interface ExtendedRpcCallback extends RpcCallback { + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + * @param reason - the reason the reply failed + * @return true if the callback would like to reattempt the reply, false otherwise + */ + public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason); + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + */ + public void replySucceeded(Serializable request, Serializable response, Member sender); +} Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java?rev=1068549&r1=1068548&r2=1068549&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java Tue Feb 8 20:18:19 2011 @@ -24,7 +24,9 @@ import java.util.HashMap; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -126,14 +128,46 @@ public class RpcChannel implements Chann }//synchronized }//end if } else{ + boolean finished = false; + final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null; + boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); Serializable reply = callback.replyRequest(rmsg.message,sender); - rmsg.reply = true; - rmsg.message = reply; - try { - channel.send(new Member[] {sender}, rmsg, - replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); - }catch ( Exception x ) { - log.error("Unable to send back reply in RpcChannel.",x); + while (!finished) { + ErrorHandler handler = null; + final Serializable request = msg; + final Serializable response = reply; + final Member fsender = sender; + if (excallback!=null && asyncReply) { + handler = new ErrorHandler() { + public void handleError(ChannelException x, UniqueId id) { + excallback.replyFailed(request, response, fsender, x); + } + + public void handleCompletion(UniqueId id) { + excallback.replySucceeded(request, response, fsender); + } + }; + } + rmsg.reply = true; + rmsg.message = reply; + try { + if (handler!=null) { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); + } else { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + } + finished = true; + if (excallback != null && !asyncReply) { + excallback.replySucceeded(rmsg.message, reply, sender); + } + }catch ( Exception x ) { + if (excallback != null && !asyncReply) { + finished = !excallback.replyFailed(rmsg.message, reply, sender, x); + } else { + finished = true; + log.error("Unable to send back reply in RpcChannel.",x); + } + } } }//end if } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org