Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 89609 invoked from network); 1 Jul 2006 22:52:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 1 Jul 2006 22:52:45 -0000 Received: (qmail 99718 invoked by uid 500); 1 Jul 2006 22:52:37 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 99663 invoked by uid 500); 1 Jul 2006 22:52:37 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 99652 invoked by uid 500); 1 Jul 2006 22:52:37 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 99649 invoked by uid 99); 1 Jul 2006 22:52:37 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Jul 2006 15:52:37 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Jul 2006 15:52:35 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 1DC261A983A; Sat, 1 Jul 2006 15:52:15 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r418516 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: io/ChannelData.java io/ObjectReader.java io/XByteBuffer.java transport/nio/NioReceiver.java transport/nio/NioReplicationThread.java Date: Sat, 01 Jul 2006 22:52:13 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060701225215.1DC261A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: fhanik Date: Sat Jul 1 15:52:11 2006 New Revision: 418516 URL: http://svn.apache.org/viewvc?rev=418516&view=rev Log: Major improvements, there seems to be an error with the thread handling on the NIOReceiver and the hand off for the worker thread Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul 1 15:52:11 2006 @@ -232,6 +232,7 @@ data.setAddress(MemberImpl.getMember(addr)); offset += addr.length; //addr data int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset); + offset += 4; //xsize length System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize); xbuf.setLength(xsize); data.message = xbuf; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Sat Jul 1 15:52:11 2006 @@ -41,6 +41,8 @@ private XByteBuffer buffer; protected long lastAccess = System.currentTimeMillis(); + + protected boolean accessed = false; /** * Creates an ObjectReader for a TCP NIO socket channel @@ -62,6 +64,18 @@ log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes."); this.buffer = new XByteBuffer(43800,true); } + } + + public synchronized void access() { + this.accessed = true; + } + + public synchronized void finish() { + this.accessed = false; + } + + public boolean isAccessed() { + return this.accessed; } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul 1 15:52:11 2006 @@ -317,6 +317,7 @@ if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); int size = toInt(buf, START_DATA.length); XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false); + xbuf.setLength(size); System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size); if (clearFromBuffer) { int totalsize = START_DATA.length + 4 + size + END_DATA.length; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul 1 15:52:11 2006 @@ -1,321 +1,377 @@ -/* - * Copyright 1999,2004-2005 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport.nio; - -import java.io.IOException; -import java.net.ServerSocket; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Iterator; - -import org.apache.catalina.tribes.ChannelReceiver; -import org.apache.catalina.tribes.io.ListenCallback; -import org.apache.catalina.tribes.io.ObjectReader; -import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.ReceiverBase; -import org.apache.catalina.tribes.transport.ThreadPool; -import org.apache.catalina.tribes.transport.WorkerThread; -import org.apache.catalina.tribes.util.StringManager; -import java.util.LinkedList; - -/** - * @author Filip Hanik - * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ - */ -public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { - - protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class); - - /** - * The string manager for this package. - */ - protected StringManager sm = StringManager.getManager(Constants.Package); - - /** - * The descriptive information about this implementation. - */ - private static final String info = "NioReceiver/1.0"; - - private Selector selector = null; - private ServerSocketChannel serverChannel = null; - - protected LinkedList events = new LinkedList(); -// private Object interestOpsMutex = new Object(); - - public NioReceiver() { - } - - /** - * Return descriptive information about this implementation and the - * corresponding version number, in the format - * <description>/<version>. - */ - public String getInfo() { - return (info); - } - -// public Object getInterestOpsMutex() { -// return interestOpsMutex; -// } - - public void stop() { - this.stopListening(); - } - - /** - * start cluster receiver - * @throws Exception - * @see org.apache.catalina.tribes.ClusterReceiver#start() - */ - public void start() throws IOException { - try { -// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); - setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); - } catch (Exception x) { - log.fatal("ThreadPool can initilzed. Listener not started", x); - if ( x instanceof IOException ) throw (IOException)x; - else throw new IOException(x.getMessage()); - } - try { - getBind(); - bind(); - Thread t = new Thread(this, "NioReceiver"); - t.setDaemon(true); - t.start(); - } catch (Exception x) { - log.fatal("Unable to start cluster receiver", x); - if ( x instanceof IOException ) throw (IOException)x; - else throw new IOException(x.getMessage()); - } - } - - public WorkerThread getWorkerThread() { - NioReplicationThread thread = new NioReplicationThread(this,this); - thread.setUseBufferPool(this.getUseBufferPool()); - thread.setRxBufSize(getRxBufSize()); - thread.setOptions(getWorkerThreadOptions()); - return thread; - } - - - - protected void bind() throws IOException { - // allocate an unbound server socket channel - serverChannel = ServerSocketChannel.open(); - // Get the associated ServerSocket to bind it with - ServerSocket serverSocket = serverChannel.socket(); - // create a new Selector for use below - selector = Selector.open(); - // set the port the server channel will listen to - //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); - bind(serverSocket,getTcpListenPort(),getAutoBind()); - // set non-blocking mode for the listening socket - serverChannel.configureBlocking(false); - // register the ServerSocketChannel with the Selector - serverChannel.register(selector, SelectionKey.OP_ACCEPT); - - } - - public void addEvent(Runnable event) { - if ( selector != null ) { - synchronized (events) { - events.add(event); - } - selector.wakeup(); - } - } - - public void events() { - if ( events.size() == 0 ) return; - synchronized (events) { - Runnable r = null; - while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) { - try { - r.run(); - } catch ( Exception x ) { - log.error("",x); - } - } - events.clear(); - } - } - - /** - * get data from channel and store in byte array - * send it to cluster - * @throws IOException - * @throws java.nio.channels.ClosedChannelException - */ - protected void listen() throws Exception { - if (doListen()) { - log.warn("ServerSocketChannel already started"); - return; - } - - setListen(true); - - while (doListen() && selector != null) { - // this may block for a long time, upon return the - // selected set contains keys of the ready channels - try { - events(); - int n = selector.select(getTcpSelectorTimeout()); - if (n == 0) { - //there is a good chance that we got here - //because the TcpReplicationThread called - //selector wakeup(). - //if that happens, we must ensure that that - //thread has enough time to call interestOps -// synchronized (interestOpsMutex) { - //if we got the lock, means there are no - //keys trying to register for the - //interestOps method -// } - continue; // nothing to do - } - // get an iterator over the set of selected keys - Iterator it = selector.selectedKeys().iterator(); - // look at each key in the selected set - while (it.hasNext()) { - SelectionKey key = (SelectionKey) it.next(); - // Is a new connection coming in? - if (key.isAcceptable()) { - ServerSocketChannel server = (ServerSocketChannel) key.channel(); - SocketChannel channel = server.accept(); - channel.socket().setReceiveBufferSize(getRxBufSize()); - channel.socket().setSendBufferSize(getTxBufSize()); - channel.socket().setTcpNoDelay(getTcpNoDelay()); - channel.socket().setKeepAlive(getSoKeepAlive()); - channel.socket().setOOBInline(getOoBInline()); - channel.socket().setReuseAddress(getSoReuseAddress()); - channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); - channel.socket().setTrafficClass(getSoTrafficClass()); - channel.socket().setSoTimeout(getTimeout()); - Object attach = new ObjectReader(channel); - registerChannel(selector, - channel, - SelectionKey.OP_READ, - attach); - } - // is there data to read on this channel? - if (key.isReadable()) { - readDataFromSocket(key); - } else { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - } - - // remove key from selected set, it's been handled - it.remove(); - } - } catch (java.nio.channels.ClosedSelectorException cse) { - // ignore is normal at shutdown or stop listen socket - } catch (java.nio.channels.CancelledKeyException nx) { - log.warn("Replication client disconnected, error when polling key. Ignoring client."); - } catch (Throwable x) { - try { - log.error("Unable to process request in NioReceiver", x); - }catch ( Throwable tx ) { - tx.printStackTrace(); - } - } - - } - serverChannel.close(); - if (selector != null) - selector.close(); - } - - /** - * Close Selector. - * - * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening() - */ - protected void stopListening() { - // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529 - setListen(false); - if (selector != null) { - try { - for (int i = 0; i < getMaxThreads(); i++) { - selector.wakeup(); - } - selector.close(); - } catch (Exception x) { - log.error("Unable to close cluster receiver selector.", x); - } finally { - selector = null; - } - } - } - - // ---------------------------------------------------------- - - /** - * Register the given channel with the given selector for - * the given operations of interest - */ - protected void registerChannel(Selector selector, - SelectableChannel channel, - int ops, - Object attach) throws Exception { - if (channel == null)return; // could happen - // set the new channel non-blocking - channel.configureBlocking(false); - // register it with the selector - channel.register(selector, ops, attach); - } - - /** - * Start thread and listen - */ - public void run() { - try { - listen(); - } catch (Exception x) { - log.error("Unable to run replication listener.", x); - } - } - - // ---------------------------------------------------------- - - /** - * Sample data handler method for a channel with data ready to read. - * @param key A SelectionKey object associated with a channel - * determined by the selector to be ready for reading. If the - * channel returns an EOF condition, it is closed here, which - * automatically invalidates the associated key. The selector - * will then de-register the channel on the next select call. - */ - protected void readDataFromSocket(SelectionKey key) throws Exception { - NioReplicationThread worker = (NioReplicationThread) getPool().getWorker(); - if (worker == null) { - // No threads available, do nothing, the selection - // loop will keep calling this method until a - // thread becomes available. - // FIXME: This design could be improved. - if (log.isDebugEnabled()) - log.debug("No TcpReplicationThread available"); - } else { - // invoking this wakes up the worker thread then returns - worker.serviceChannel(key); - } - } - - -} +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport.nio; + +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.transport.Constants; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.catalina.tribes.transport.ThreadPool; +import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.util.StringManager; +import java.util.LinkedList; +import java.util.Set; +import java.nio.channels.CancelledKeyException; + +/** + * @author Filip Hanik + * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ + */ +public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class); + + /** + * The string manager for this package. + */ + protected StringManager sm = StringManager.getManager(Constants.Package); + + /** + * The descriptive information about this implementation. + */ + private static final String info = "NioReceiver/1.0"; + + private Selector selector = null; + private ServerSocketChannel serverChannel = null; + + protected LinkedList events = new LinkedList(); +// private Object interestOpsMutex = new Object(); + + public NioReceiver() { + } + + /** + * Return descriptive information about this implementation and the + * corresponding version number, in the format + * <description>/<version>. + */ + public String getInfo() { + return (info); + } + +// public Object getInterestOpsMutex() { +// return interestOpsMutex; +// } + + public void stop() { + this.stopListening(); + } + + /** + * start cluster receiver + * @throws Exception + * @see org.apache.catalina.tribes.ClusterReceiver#start() + */ + public void start() throws IOException { + try { +// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); + setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); + } catch (Exception x) { + log.fatal("ThreadPool can initilzed. Listener not started", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); + } + try { + getBind(); + bind(); + Thread t = new Thread(this, "NioReceiver"); + t.setDaemon(true); + t.start(); + } catch (Exception x) { + log.fatal("Unable to start cluster receiver", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); + } + } + + public WorkerThread getWorkerThread() { + NioReplicationThread thread = new NioReplicationThread(this,this); + thread.setUseBufferPool(this.getUseBufferPool()); + thread.setRxBufSize(getRxBufSize()); + thread.setOptions(getWorkerThreadOptions()); + return thread; + } + + + + protected void bind() throws IOException { + // allocate an unbound server socket channel + serverChannel = ServerSocketChannel.open(); + // Get the associated ServerSocket to bind it with + ServerSocket serverSocket = serverChannel.socket(); + // create a new Selector for use below + selector = Selector.open(); + // set the port the server channel will listen to + //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); + bind(serverSocket,getTcpListenPort(),getAutoBind()); + // set non-blocking mode for the listening socket + serverChannel.configureBlocking(false); + // register the ServerSocketChannel with the Selector + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + } + + public void addEvent(Runnable event) { + if ( selector != null ) { + synchronized (events) { + events.add(event); + } + selector.wakeup(); + } + } + + public void events() { + if ( events.size() == 0 ) return; + synchronized (events) { + Runnable r = null; + while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) { + try { + r.run(); + } catch ( Exception x ) { + log.error("",x); + } + } + events.clear(); + } + } + + public static void cancelledKey(SelectionKey key) { + try { + ObjectReader ka = (ObjectReader)key.attachment(); + key.cancel(); + key.channel().close(); + key.attach(null); + if ( ka != null ) ka.finish(); + } catch (IOException e) { + if (log.isDebugEnabled()) log.debug("", e); + // Ignore + } + } + + protected void socketTimeouts() { + //timeout + Set keys = selector.keys(); + long now = System.currentTimeMillis(); + for (Iterator iter = keys.iterator(); iter.hasNext(); ) { + SelectionKey key = (SelectionKey) iter.next(); + try { +// if (key.interestOps() == SelectionKey.OP_READ) { +// //only timeout sockets that we are waiting for a read from +// ObjectReader ka = (ObjectReader) key.attachment(); +// long delta = now - ka.getLastAccess(); +// if (delta > (long) getTimeout()) { +// cancelledKey(key); +// } +// } +// else + if ( key.interestOps() == 0 ) { + //check for keys that didn't make it in. + ObjectReader ka = (ObjectReader) key.attachment(); + if ( ka != null ) { + long delta = now - ka.getLastAccess(); + if (delta > (long) getTimeout() && (!ka.isAccessed())) { + log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms."); + ka.setLastAccess(now); + key.interestOps(SelectionKey.OP_READ); + }//end if + } else { + cancelledKey(key); + }//end if + }//end if + }catch ( CancelledKeyException ckx ) { + cancelledKey(key); + } + } + } + + + /** + * get data from channel and store in byte array + * send it to cluster + * @throws IOException + * @throws java.nio.channels.ClosedChannelException + */ + protected void listen() throws Exception { + if (doListen()) { + log.warn("ServerSocketChannel already started"); + return; + } + + setListen(true); + + while (doListen() && selector != null) { + // this may block for a long time, upon return the + // selected set contains keys of the ready channels + try { + events(); + socketTimeouts(); + int n = selector.select(getTcpSelectorTimeout()); + if (n == 0) { + //there is a good chance that we got here + //because the TcpReplicationThread called + //selector wakeup(). + //if that happens, we must ensure that that + //thread has enough time to call interestOps +// synchronized (interestOpsMutex) { + //if we got the lock, means there are no + //keys trying to register for the + //interestOps method +// } + continue; // nothing to do + } + // get an iterator over the set of selected keys + Iterator it = selector.selectedKeys().iterator(); + // look at each key in the selected set + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + // Is a new connection coming in? + if (key.isAcceptable()) { + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + SocketChannel channel = server.accept(); + channel.socket().setReceiveBufferSize(getRxBufSize()); + channel.socket().setSendBufferSize(getTxBufSize()); + channel.socket().setTcpNoDelay(getTcpNoDelay()); + channel.socket().setKeepAlive(getSoKeepAlive()); + channel.socket().setOOBInline(getOoBInline()); + channel.socket().setReuseAddress(getSoReuseAddress()); + channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); + channel.socket().setTrafficClass(getSoTrafficClass()); + channel.socket().setSoTimeout(getTimeout()); + Object attach = new ObjectReader(channel); + registerChannel(selector, + channel, + SelectionKey.OP_READ, + attach); + } + // is there data to read on this channel? + if (key.isReadable()) { + readDataFromSocket(key); + } else { + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + + // remove key from selected set, it's been handled + it.remove(); + } + } catch (java.nio.channels.ClosedSelectorException cse) { + // ignore is normal at shutdown or stop listen socket + } catch (java.nio.channels.CancelledKeyException nx) { + log.warn("Replication client disconnected, error when polling key. Ignoring client."); + } catch (Throwable x) { + try { + log.error("Unable to process request in NioReceiver", x); + }catch ( Throwable tx ) { + //in case an out of memory error, will affect the logging framework as well + tx.printStackTrace(); + } + } + + } + serverChannel.close(); + if (selector != null) + selector.close(); + } + + + + /** + * Close Selector. + * + * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening() + */ + protected void stopListening() { + // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529 + setListen(false); + if (selector != null) { + try { + for (int i = 0; i < getMaxThreads(); i++) { + selector.wakeup(); + } + selector.close(); + } catch (Exception x) { + log.error("Unable to close cluster receiver selector.", x); + } finally { + selector = null; + } + } + } + + // ---------------------------------------------------------- + + /** + * Register the given channel with the given selector for + * the given operations of interest + */ + protected void registerChannel(Selector selector, + SelectableChannel channel, + int ops, + Object attach) throws Exception { + if (channel == null)return; // could happen + // set the new channel non-blocking + channel.configureBlocking(false); + // register it with the selector + channel.register(selector, ops, attach); + } + + /** + * Start thread and listen + */ + public void run() { + try { + listen(); + } catch (Exception x) { + log.error("Unable to run replication listener.", x); + } + } + + // ---------------------------------------------------------- + + /** + * Sample data handler method for a channel with data ready to read. + * @param key A SelectionKey object associated with a channel + * determined by the selector to be ready for reading. If the + * channel returns an EOF condition, it is closed here, which + * automatically invalidates the associated key. The selector + * will then de-register the channel on the next select call. + */ + protected void readDataFromSocket(SelectionKey key) throws Exception { + NioReplicationThread worker = (NioReplicationThread) getPool().getWorker(); + if (worker == null) { + // No threads available, do nothing, the selection + // loop will keep calling this method until a + // thread becomes available. + // FIXME: This design could be improved. + if (log.isDebugEnabled()) + log.debug("No TcpReplicationThread available"); + } else { + // invoking this wakes up the worker thread then returns + worker.serviceChannel(key); + } + } + + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 15:52:11 2006 @@ -1,244 +1,256 @@ -/* - * Copyright 1999,2004 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport.nio; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; - -import org.apache.catalina.tribes.io.ObjectReader; -import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.WorkerThread; -import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.io.ListenCallback; -import org.apache.catalina.tribes.io.ChannelData; -import org.apache.catalina.tribes.io.BufferPool; -import java.nio.channels.CancelledKeyException; - -/** - * A worker thread class which can drain channels and echo-back the input. Each - * instance is constructed with a reference to the owning thread pool object. - * When started, the thread loops forever waiting to be awakened to service the - * channel associated with a SelectionKey object. The worker is tasked by - * calling its serviceChannel() method with a SelectionKey object. The - * serviceChannel() method stores the key reference in the thread object then - * calls notify() to wake it up. When the channel has been drained, the worker - * thread returns itself to its parent pool. - * - * @author Filip Hanik - * - * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ - */ -public class NioReplicationThread extends WorkerThread { - - private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class ); - private ByteBuffer buffer = null; - private SelectionKey key; - private int rxBufSize; - private NioReceiver receiver; - public NioReplicationThread (ListenCallback callback, NioReceiver receiver) - { - super(callback); - this.receiver = receiver; - } - - // loop forever waiting for work to do - public synchronized void run() - { - if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { - buffer = ByteBuffer.allocateDirect(getRxBufSize()); - }else { - buffer = ByteBuffer.allocate (getRxBufSize()); - } - while (isDoRun()) { - try { - // sleep and release object lock - this.wait(); - } catch (InterruptedException e) { - if(log.isInfoEnabled()) - log.info("TCP worker thread interrupted in cluster",e); - // clear interrupt status - Thread.interrupted(); - } - if (key == null) { - continue; // just in case - } - try { - drainChannel (key); - } catch (Exception e) { - //this is common, since the sockets on the other - //end expire after a certain time. - if ( e instanceof CancelledKeyException ) { - //do nothing - } else if ( e instanceof IOException ) { - //dont spew out stack traces for IO exceptions unless debug is enabled. - if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e); - else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed."); - } else if ( log.isErrorEnabled() ) { - //this is a real error, log it. - log.error("Exception caught in TcpReplicationThread.drainChannel.",e); - } - - // close channel and nudge selector - try { - key.channel().close(); - } catch (IOException ex) { - log.error("Unable to close channel.",ex); - } - key.selector().wakeup(); - } - key = null; - // done, ready for more, return to pool - getPool().returnWorker (this); - } - } - - /** - * Called to initiate a unit of work by this worker thread - * on the provided SelectionKey object. This method is - * synchronized, as is the run() method, so only one key - * can be serviced at a given time. - * Before waking the worker thread, and before returning - * to the main selection loop, this key's interest set is - * updated to remove OP_READ. This will cause the selector - * to ignore read-readiness for this channel while the - * worker thread is servicing it. - */ - public synchronized void serviceChannel (SelectionKey key) { - this.key = key; - key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); - key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); - this.notify(); // awaken the thread - } - - /** - * The actual code which drains the channel associated with - * the given key. This method assumes the key has been - * modified prior to invocation to turn off selection - * interest in OP_READ. When this method completes it - * re-enables OP_READ and calls wakeup() on the selector - * so the selector will resume watching this channel. - */ - protected void drainChannel (final SelectionKey key) throws Exception { - SocketChannel channel = (SocketChannel) key.channel(); - int count; - buffer.clear(); // make buffer empty - ObjectReader reader = (ObjectReader)key.attachment(); - reader.setLastAccess(System.currentTimeMillis()); - // loop while data available, channel is non-blocking - while ((count = channel.read (buffer)) > 0) { - buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) - reader.append(buffer.array(),0,count,false); - else - reader.append(buffer,count,false); - buffer.clear(); // make buffer empty - } - - int pkgcnt = reader.count(); - - if ( pkgcnt > 0 ) { - ChannelMessage[] msgs = reader.execute(); - for ( int i=0; i 0) { + buffer.flip(); // make buffer readable + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + buffer.clear(); // make buffer empty + } + + int pkgcnt = reader.count(); + + if ( pkgcnt > 0 ) { + ChannelMessage[] msgs = reader.execute(); + for ( int i=0; i