Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 89709 invoked from network); 18 May 2006 22:34:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 18 May 2006 22:34:42 -0000 Received: (qmail 1921 invoked by uid 500); 18 May 2006 22:34:38 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 1473 invoked by uid 500); 18 May 2006 22:34:37 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 1462 invoked by uid 500); 18 May 2006 22:34:37 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 1459 invoked by uid 99); 18 May 2006 22:34:37 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 May 2006 15:34:37 -0700 X-ASF-Spam-Status: No, hits=0.6 required=10.0 tests=NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 May 2006 15:34:36 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 223031A9835; Thu, 18 May 2006 15:34:16 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r407648 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/transport/bio/util/ test/java/org/apache/catalina/tribes/demos/ Date: Thu, 18 May 2006 22:34:15 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060518223416.223031A9835@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: fhanik Date: Thu May 18 15:34:14 2006 New Revision: 407648 URL: http://svn.apache.org/viewvc?rev=407648&view=rev Log: Implemented the async using java.util.concurrent for the 1.5 package. much better results Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=407648&r1=407647&r2=407648&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Thu May 18 15:34:14 2006 @@ -14,8 +14,14 @@ */ package org.apache.catalina.tribes.group.interceptors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.transport.bio.util.LinkObject; + /** * * Same implementation as the MessageDispatchInterceptor @@ -28,6 +34,7 @@ public class MessageDispatch15Interceptor extends MessageDispatchInterceptor { protected AtomicLong currentSize = new AtomicLong(0); + protected LinkedBlockingQueue queue = new LinkedBlockingQueue(); public long getCurrentSize() { return currentSize.get(); @@ -41,5 +48,34 @@ currentSize.set(value); return value; } + + public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) { + LinkObject obj = new LinkObject(msg,destination,payload); + return queue.offer(obj); + } + + public LinkObject removeFromQueue() { + LinkObject head = null; + try { + head = (LinkObject)queue.take(); + }catch ( InterruptedException x ) {} + return head; + } + + public void startQueue() { + msgDispatchThread = new Thread(this); + msgDispatchThread.setName("MessageDispatch15Interceptor.MessageDispatchThread"); + msgDispatchThread.setDaemon(true); + msgDispatchThread.setPriority(Thread.MAX_PRIORITY); + run = true; + msgDispatchThread.start(); + } + + public void stopQueue() { + run = false; + msgDispatchThread.interrupt(); + setAndGetCurrentSize(0); + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=407648&r1=407647&r2=407648&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Thu May 18 15:34:14 2006 @@ -37,14 +37,15 @@ * @version 1.0 */ public class MessageDispatchInterceptor extends ChannelInterceptorBase implements Runnable { - private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class); + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class); - private long maxQueueSize = 1024*1024*64; //64MB - private FastQueue queue = new FastQueue(); - private boolean run = false; - private Thread msgDispatchThread = null; + protected long maxQueueSize = 1024*1024*64; //64MB + protected FastQueue queue = new FastQueue(); + protected boolean run = false; + protected Thread msgDispatchThread = null; protected long currentSize = 0; - private boolean useDeepClone = true; + protected boolean useDeepClone = true; + protected boolean alwaysSend = true; public MessageDispatchInterceptor() { setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS); @@ -53,19 +54,51 @@ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS; if ( async && run ) { - if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+getCurrentSize()+" bytes."); + if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) { + if ( alwaysSend ) { + super.sendMessage(destination,msg,payload); + return; + } else { + throw new ChannelException("Asynchronous queue is full, reached its limit of " + maxQueueSize +" bytes, current:" + getCurrentSize() + " bytes."); + }//end if + }//end if //add to queue if ( useDeepClone ) msg = (ChannelMessage)msg.deepclone(); - if (!queue.add(msg, destination, payload) ) { + if (!addToQueue(msg, destination, payload) ) { throw new ChannelException("Unable to add the message to the async queue, queue bug?"); } addAndGetCurrentSize(msg.getMessage().getLength()); } else { - System.out.println("Not queueing the message"); super.sendMessage(destination, msg, payload); } } + public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) { + return queue.add(msg,destination,payload); + } + + public LinkObject removeFromQueue() { + return queue.remove(); + } + + public void startQueue() { + msgDispatchThread = new Thread(this); + msgDispatchThread.setName("MessageDispatchInterceptor.MessageDispatchThread"); + msgDispatchThread.setDaemon(true); + msgDispatchThread.setPriority(Thread.MAX_PRIORITY); + queue.setEnabled(true); + run = true; + msgDispatchThread.start(); + } + + public void stopQueue() { + run = false; + msgDispatchThread.interrupt(); + queue.setEnabled(false); + setAndGetCurrentSize(0); + } + + public void setOptionFlag(int flag) { if ( flag != Channel.SEND_OPTIONS_ASYNCHRONOUS ) log.warn("Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use."); super.setOptionFlag(flag); @@ -106,13 +139,7 @@ if (!run ) { synchronized (this) { if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender - msgDispatchThread = new Thread(this); - msgDispatchThread.setName("MessageDispatchThread"); - msgDispatchThread.setDaemon(true); - msgDispatchThread.setPriority(Thread.MAX_PRIORITY); - queue.setEnabled(true); - run = true; - msgDispatchThread.start(); + startQueue(); }//end if }//sync }//end if @@ -125,10 +152,7 @@ if ( run ) { synchronized (this) { if ( run && ((svc & Channel.SND_TX_SEQ)==svc)) { - run = false; - msgDispatchThread.interrupt(); - queue.setEnabled(false); - setAndGetCurrentSize(0); + stopQueue(); }//end if }//sync }//end if @@ -138,7 +162,7 @@ public void run() { while ( run ) { - LinkObject link = queue.remove(); + LinkObject link = removeFromQueue(); if ( link == null ) continue; //should not happen unless we exceed wait time while ( link != null && run ) { ChannelMessage msg = link.data(); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java?rev=407648&r1=407647&r2=407648&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java Thu May 18 15:34:14 2006 @@ -71,6 +71,10 @@ public LinkObject next() { return next; } + + public void setNext(LinkObject next) { + this.next = next; + } /** * Get the data object from the element. Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407648&r1=407647&r2=407648&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 15:34:14 2006 @@ -71,7 +71,7 @@ .append("\n\t\t[-throughput]") .append("\n\t\t[-failuredetect]") .append("\n\t\t[-async]") - .append("\n\t\t[-asyncsize maxqueuesizeinbytes]"); + .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]"); return buf; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org