Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 28699 invoked from network); 17 Oct 2005 14:55:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 17 Oct 2005 14:55:34 -0000 Received: (qmail 78729 invoked by uid 500); 17 Oct 2005 14:55:33 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 78695 invoked by uid 500); 17 Oct 2005 14:55:32 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 78684 invoked by uid 99); 17 Oct 2005 14:55:32 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Oct 2005 07:55:32 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 17 Oct 2005 07:55:32 -0700 Received: (qmail 28533 invoked by uid 65534); 17 Oct 2005 14:55:11 -0000 Message-ID: <20051017145511.28528.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r325903 - in /directory/network: branches/0.7/src/java/org/apache/mina/util/ trunk/src/java/org/apache/mina/filter/ trunk/src/java/org/apache/mina/util/ Date: Mon, 17 Oct 2005 14:55:03 -0000 To: commits@directory.apache.org From: trustin@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: trustin Date: Mon Oct 17 07:54:54 2005 New Revision: 325903 URL: http://svn.apache.org/viewcvs?rev=325903&view=rev Log: Resolved issue: DIRMINA-100 - Prioritized SessionBuffer fetching in ThreadPoolFilter * Replaced BlockingSet with BlockingQueue because unfetchedSessionBuffers works more efficiently with Queue * Removed unused classes * Added ThreadPoolFilter.fetchSessionBuffer() to allow users to prioritize buffer fetching Added: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java (with props) directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java (with props) Removed: directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java directory/network/trunk/src/java/org/apache/mina/util/BlockingSet.java directory/network/trunk/src/java/org/apache/mina/util/Event.java directory/network/trunk/src/java/org/apache/mina/util/EventType.java Modified: directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Modified: directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java?rev=325903&r1=325902&r2=325903&view=diff ============================================================================== --- directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java (original) +++ directory/network/branches/0.7/src/java/org/apache/mina/util/BaseThreadPool.java Mon Oct 17 07:54:54 2005 @@ -20,7 +20,6 @@ import java.util.HashSet; import java.util.IdentityHashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -82,7 +81,7 @@ private final String threadNamePrefix; private final Map buffers = new IdentityHashMap(); private final Stack followers = new Stack(); - private final BlockingSet unfetchedSessionBuffers = new BlockingSet(); + private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue(); private final Set allSessionBuffers = new HashSet(); private Worker leader; @@ -215,7 +214,7 @@ protected void fireEvent( Object nextFilter, Session session, EventType type, Object data ) { - final BlockingSet unfetchedSessionBuffers = this.unfetchedSessionBuffers; + final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers; final Set allSessionBuffers = this.allSessionBuffers; final Event event = new Event( type, nextFilter, data ); @@ -232,7 +231,7 @@ if( !allSessionBuffers.contains( buf ) ) { allSessionBuffers.add( buf ); - unfetchedSessionBuffers.add( buf ); + unfetchedSessionBuffers.push( buf ); } } } @@ -243,6 +242,19 @@ protected abstract void processEvent( Object nextFilter, Session session, EventType type, Object data ); + + /** + * Implement this method to fetch (or pop) a {@link SessionBuffer} from + * the given unfetchedSessionBuffers. The default implementation + * simply pops the buffer from it. You could prioritize the fetch order. + * + * @return A non-null {@link SessionBuffer} + */ + protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers ) + { + return ( SessionBuffer ) unfetchedSessionBuffers.pop(); + } + private SessionBuffer getSessionBuffer( Session session ) { final Map buffers = this.buffers; @@ -272,7 +284,7 @@ } } - private static class SessionBuffer + protected static class SessionBuffer { private final Session session; @@ -341,8 +353,7 @@ private SessionBuffer fetchBuffer() { - SessionBuffer buf; - BlockingSet unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers; + BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers; synchronized( unfetchedSessionBuffers ) { for( ;; ) @@ -363,13 +374,7 @@ } } - Iterator it = unfetchedSessionBuffers.iterator(); - while( it.hasNext() ) - { - buf = ( SessionBuffer ) it.next(); - it.remove(); - return buf; - } + return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers ); } } } @@ -410,7 +415,7 @@ private void releaseBuffer( SessionBuffer buf ) { - final BlockingSet unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers; + final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers; final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers; final Queue eventQueue = buf.eventQueue; @@ -423,7 +428,7 @@ } else { - unfetchedSessionBuffers.add( buf ); + unfetchedSessionBuffers.push( buf ); } } } Added: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java URL: http://svn.apache.org/viewcvs/directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java?rev=325903&view=auto ============================================================================== --- directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java (added) +++ directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java Mon Oct 17 07:54:54 2005 @@ -0,0 +1,109 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.util; + +/** + * A synchronized version of {@link Queue}. + * + * @author Trustin Lee + * @version $Rev$, $Date$ + */ +public class BlockingQueue extends Queue +{ + private static final long serialVersionUID = 5516588196355725567L; + + private int waiters = 0; + + public BlockingQueue() + { + } + + public synchronized int capacity() + { + return super.capacity(); + } + + public synchronized void clear() + { + super.clear(); + } + + public synchronized Object first() + { + return super.first(); + } + + public synchronized Object get( int idx ) + { + return super.get( idx ); + } + + public synchronized boolean isEmpty() + { + return super.isEmpty(); + } + + public synchronized Object last() + { + return super.last(); + } + + public synchronized Object pop() + { + return super.pop(); + } + + public synchronized void push( Object obj ) + { + super.push( obj ); + if( waiters > 0 ) + notify(); + } + + public synchronized int size() + { + return super.size(); + } + + public synchronized String toString() + { + return super.toString(); + } + + /** + * Waits until any elements are in this queue. + * + * @throws InterruptedException if the current thread is interrupted + */ + public synchronized void waitForNewItem() throws InterruptedException + { + waiters++; + try + { + while( super.isEmpty() ) + { + wait(); + } + } + finally + { + waiters--; + } + } +} Propchange: directory/network/branches/0.7/src/java/org/apache/mina/util/BlockingQueue.java ------------------------------------------------------------------------------ svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=325903&r1=325902&r2=325903&view=diff ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original) +++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Mon Oct 17 07:54:54 2005 @@ -20,7 +20,6 @@ import java.util.HashSet; import java.util.IdentityHashMap; -import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -28,8 +27,7 @@ import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoSession; -import org.apache.mina.util.BaseThreadPool; -import org.apache.mina.util.BlockingSet; +import org.apache.mina.util.BlockingQueue; import org.apache.mina.util.ByteBufferUtil; import org.apache.mina.util.Queue; import org.apache.mina.util.Stack; @@ -43,7 +41,6 @@ * @version $Rev$, $Date$ * * @see ThreadPool - * @see BaseThreadPool */ public class ThreadPoolFilter implements IoFilter { @@ -93,7 +90,7 @@ private final Map parents = new IdentityHashMap(); private final Map buffers = new IdentityHashMap(); private final Stack followers = new Stack(); - private final BlockingSet unfetchedSessionBuffers = new BlockingSet(); + private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue(); private final Set allSessionBuffers = new HashSet(); private Worker leader; @@ -184,10 +181,10 @@ } } - protected void fireEvent( NextFilter nextFilter, IoSession session, + private void fireEvent( NextFilter nextFilter, IoSession session, EventType type, Object data ) { - final BlockingSet unfetchedSessionBuffers = this.unfetchedSessionBuffers; + final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers; final Set allSessionBuffers = this.allSessionBuffers; final Event event = new Event( type, nextFilter, data ); @@ -204,10 +201,22 @@ if( !allSessionBuffers.contains( buf ) ) { allSessionBuffers.add( buf ); - unfetchedSessionBuffers.add( buf ); + unfetchedSessionBuffers.push( buf ); } } } + + /** + * Implement this method to fetch (or pop) a {@link SessionBuffer} from + * the given unfetchedSessionBuffers. The default implementation + * simply pops the buffer from it. You could prioritize the fetch order. + * + * @return A non-null {@link SessionBuffer} + */ + protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers ) + { + return ( SessionBuffer ) unfetchedSessionBuffers.pop(); + } private SessionBuffer getSessionBuffer( IoSession session ) { @@ -238,7 +247,7 @@ } } - private static class SessionBuffer + protected static class SessionBuffer { private final IoSession session; @@ -306,8 +315,7 @@ private SessionBuffer fetchBuffer() { - SessionBuffer buf; - BlockingSet unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers; + BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers; synchronized( unfetchedSessionBuffers ) { for( ;; ) @@ -328,13 +336,7 @@ } } - Iterator it = unfetchedSessionBuffers.iterator(); - while( it.hasNext() ) - { - buf = ( SessionBuffer ) it.next(); - it.remove(); - return buf; - } + return ThreadPoolFilter.this.fetchSessionBuffer( unfetchedSessionBuffers ); } } } @@ -375,7 +377,7 @@ private void releaseBuffer( SessionBuffer buf ) { - final BlockingSet unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers; + final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers; final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers; final Queue eventQueue = buf.eventQueue; @@ -388,7 +390,7 @@ } else { - unfetchedSessionBuffers.add( buf ); + unfetchedSessionBuffers.push( buf ); } } } Added: directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java?rev=325903&view=auto ============================================================================== --- directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java (added) +++ directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java Mon Oct 17 07:54:54 2005 @@ -0,0 +1,109 @@ +/* + * @(#) $Id$ + * + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.mina.util; + +/** + * A synchronized version of {@link Queue}. + * + * @author Trustin Lee + * @version $Rev$, $Date$ + */ +public class BlockingQueue extends Queue +{ + private static final long serialVersionUID = 5516588196355725567L; + + private int waiters = 0; + + public BlockingQueue() + { + } + + public synchronized int capacity() + { + return super.capacity(); + } + + public synchronized void clear() + { + super.clear(); + } + + public synchronized Object first() + { + return super.first(); + } + + public synchronized Object get( int idx ) + { + return super.get( idx ); + } + + public synchronized boolean isEmpty() + { + return super.isEmpty(); + } + + public synchronized Object last() + { + return super.last(); + } + + public synchronized Object pop() + { + return super.pop(); + } + + public synchronized void push( Object obj ) + { + super.push( obj ); + if( waiters > 0 ) + notify(); + } + + public synchronized int size() + { + return super.size(); + } + + public synchronized String toString() + { + return super.toString(); + } + + /** + * Waits until any elements are in this queue. + * + * @throws InterruptedException if the current thread is interrupted + */ + public synchronized void waitForNewItem() throws InterruptedException + { + waiters++; + try + { + while( super.isEmpty() ) + { + wait(); + } + } + finally + { + waiters--; + } + } +} Propchange: directory/network/trunk/src/java/org/apache/mina/util/BlockingQueue.java ------------------------------------------------------------------------------ svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision