Return-Path: Delivered-To: apmail-directory-commits-archive@www.apache.org Received: (qmail 29401 invoked from network); 5 Nov 2006 19:37:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Nov 2006 19:37:12 -0000 Received: (qmail 14080 invoked by uid 500); 5 Nov 2006 19:37:24 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 14045 invoked by uid 500); 5 Nov 2006 19:37:24 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 14034 invoked by uid 99); 5 Nov 2006 19:37:23 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Nov 2006 11:37:23 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Nov 2006 11:37:10 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 1A7DE1A9846; Sun, 5 Nov 2006 11:36:44 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r471502 [1/2] - in /directory/branches/mina/1.2: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/common/support/ core/src/main/java/org/apache/mina/filter/ core/src/main/java/org/apache/mina/filter/codec/ core/... Date: Sun, 05 Nov 2006 19:36:43 -0000 To: commits@directory.apache.org From: proyal@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061105193644.1A7DE1A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: proyal Date: Sun Nov 5 11:36:41 2006 New Revision: 471502 URL: http://svn.apache.org/viewvc?view=rev&rev=471502 Log: Remove mina's Queue, use java.util.Queue. Replace synchronization with a concurrent implementation. Removed: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/BlockingQueue.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/Queue.java directory/branches/mina/1.2/core/src/test/java/org/apache/mina/util/QueueTest.java Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java Sun Nov 5 11:36:41 2006 @@ -6,32 +6,32 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.common; /** * A configuration which is used to configure {@link IoService}. - * + * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ -public interface IoServiceConfig +public interface IoServiceConfig extends Cloneable { /** * Resturns the default configuration of the new {@link IoSession}s. */ IoSessionConfig getSessionConfig(); - + /** * Returns the {@link IoFilterChainBuilder} which will modify the * {@link IoFilterChain} of all {@link IoSession}s which is created @@ -39,7 +39,7 @@ * The default value is an empty {@link DefaultIoFilterChainBuilder}. */ IoFilterChainBuilder getFilterChainBuilder(); - + /** * Sets the {@link IoFilterChainBuilder} which will modify the * {@link IoFilterChain} of all {@link IoSession}s which is created @@ -48,7 +48,7 @@ * an empty {@link DefaultIoFilterChainBuilder}. */ void setFilterChainBuilder( IoFilterChainBuilder builder ); - + /** * A shortcut for ( ( DefaultIoFilterChainBuilder ) {@link #getFilterChainBuilder()} ). * Please note that the returned object is not a real {@link IoFilterChain} @@ -60,7 +60,7 @@ * not a {@link DefaultIoFilterChainBuilder} */ DefaultIoFilterChainBuilder getFilterChain(); - + /** * Returns the default {@link ThreadModel} of the {@link IoService}. * The default value is a {@link ExecutorThreadModel}() whose service name is @@ -69,7 +69,7 @@ * {@link ExecutorThreadModel#getInstance(String)}. */ ThreadModel getThreadModel(); - + /** * Sets the default {@link ThreadModel} of the {@link IoService}. * If you specify null, this property will be set to the @@ -80,9 +80,10 @@ * {@link ExecutorThreadModel#getInstance(String)}. */ void setThreadModel( ThreadModel threadModel ); - + /** * Returns a deep clone of this configuration. */ + @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "override"}) Object clone(); } Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java Sun Nov 5 11:36:41 2006 @@ -6,16 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.common.support; @@ -29,7 +29,7 @@ /** * A base implementation of {@link IoServiceConfig}. - * + * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ @@ -39,17 +39,17 @@ * Current filter chain builder. */ private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder(); - + /** * The default thread model. */ private final ThreadModel defaultThreadModel = ExecutorThreadModel.getInstance("AnonymousIoService"); - + /** * Current thread model. */ private ThreadModel threadModel = defaultThreadModel; - + public BaseIoServiceConfig() { super(); @@ -68,7 +68,7 @@ } filterChainBuilder = builder; } - + public DefaultIoFilterChainBuilder getFilterChain() { if( filterChainBuilder instanceof DefaultIoFilterChainBuilder ) @@ -81,7 +81,7 @@ "Current filter chain builder is not a DefaultIoFilterChainBuilder." ); } } - + public ThreadModel getThreadModel() { return threadModel; @@ -97,7 +97,9 @@ } this.threadModel = threadModel; } - + + @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException"}) + @Override public Object clone() { BaseIoServiceConfig ret; @@ -109,20 +111,21 @@ { throw ( InternalError ) new InternalError().initCause( e ); } - + // Try to clone the chain builder. try { - Method cloneMethod = this.filterChainBuilder.getClass().getMethod( "clone", null ); + Method cloneMethod = this.filterChainBuilder.getClass().getMethod( "clone" ); if( cloneMethod.isAccessible() ) { - ret.filterChainBuilder = ( IoFilterChainBuilder ) cloneMethod.invoke( this.filterChainBuilder, null ); + ret.filterChainBuilder = ( IoFilterChainBuilder ) cloneMethod.invoke( this.filterChainBuilder ); } } catch( Exception e ) { + // uncloneable } - + return ret; } } Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java Sun Nov 5 11:36:41 2006 @@ -6,50 +6,51 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.filter; import java.io.IOException; import java.io.InputStream; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; -import org.apache.mina.util.Queue; /** * Filter implementation which makes it possible to write {@link InputStream} - * objects directly using {@link IoSession#write(Object)}. When an + * objects directly using {@link IoSession#write(Object)}. When an * {@link InputStream} is written to a session this filter will read the bytes * from the stream into {@link ByteBuffer} objects and write those buffers * to the next filter. When end of stream has been reached this filter will * call {@link NextFilter#messageSent(IoSession, Object)} using the original - * {@link InputStream} written to the session and notifies - * {@link org.apache.mina.common.WriteFuture} on the + * {@link InputStream} written to the session and notifies + * {@link org.apache.mina.common.WriteFuture} on the * original {@link org.apache.mina.common.IoFilter.WriteRequest}. - *

+ *

* This filter will ignore written messages which aren't {@link InputStream} * instances. Such messages will be passed to the next filter directly. *

- *

+ *

* NOTE: this filter does not close the stream after all data from stream * has been written. The {@link org.apache.mina.common.IoHandler} should take - * care of that in its - * {@link org.apache.mina.common.IoHandler#messageSent(IoSession, Object)} + * care of that in its + * {@link org.apache.mina.common.IoHandler#messageSent(IoSession, Object)} * callback. *

- * + * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ @@ -69,32 +70,33 @@ protected static final String INITIAL_WRITE_FUTURE = StreamWriteFilter.class.getName() + ".future"; private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE; - - public void filterWrite( NextFilter nextFilter, IoSession session, - WriteRequest writeRequest ) throws Exception + + @Override + public void filterWrite( NextFilter nextFilter, IoSession session, + WriteRequest writeRequest ) throws Exception { // If we're already processing a stream we need to queue the WriteRequest. if( session.getAttribute( CURRENT_STREAM ) != null ) { - Queue queue = ( Queue ) session.getAttribute( WRITE_REQUEST_QUEUE ); + Queue queue = ( Queue ) session.getAttribute( WRITE_REQUEST_QUEUE ); if( queue == null ) { - queue = new Queue(); + queue = new ConcurrentLinkedQueue( ); session.setAttribute( WRITE_REQUEST_QUEUE, queue ); } - queue.push( writeRequest ); + queue.add( writeRequest ); return; } - + Object message = writeRequest.getMessage(); - + if( message instanceof InputStream ) { - + InputStream inputStream = ( InputStream ) message; - + ByteBuffer byteBuffer = getNextByteBuffer( inputStream ); - if ( byteBuffer == null ) + if( byteBuffer == null ) { // End of stream reached. writeRequest.getFuture().setWritten( true ); @@ -104,7 +106,7 @@ { session.setAttribute( CURRENT_STREAM, inputStream ); session.setAttribute( INITIAL_WRITE_FUTURE, writeRequest.getFuture() ); - + nextFilter.filterWrite( session, new WriteRequest( byteBuffer ) ); } @@ -115,10 +117,11 @@ } } + @Override public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception { InputStream inputStream = ( InputStream ) session.getAttribute( CURRENT_STREAM ); - + if( inputStream == null ) { nextFilter.messageSent( session, message ); @@ -126,25 +129,25 @@ else { ByteBuffer byteBuffer = getNextByteBuffer( inputStream ); - - if( byteBuffer == null ) + + if( byteBuffer == null ) { // End of stream reached. session.removeAttribute( CURRENT_STREAM ); WriteFuture writeFuture = ( WriteFuture ) session.removeAttribute( INITIAL_WRITE_FUTURE ); - + // Write queued WriteRequests. - Queue queue = ( Queue ) session.removeAttribute( WRITE_REQUEST_QUEUE ); + Queue queue = ( Queue ) session.removeAttribute( WRITE_REQUEST_QUEUE ); if( queue != null ) { - WriteRequest wr = ( WriteRequest ) queue.pop(); + WriteRequest wr = queue.poll(); while( wr != null ) { filterWrite( nextFilter, session, wr ); - wr = ( WriteRequest ) queue.pop(); + wr = queue.poll(); } } - + writeFuture.setWritten( true ); nextFilter.messageSent( session, inputStream ); } @@ -155,32 +158,30 @@ } } - private ByteBuffer getNextByteBuffer( InputStream is ) throws IOException + private ByteBuffer getNextByteBuffer( InputStream is ) throws IOException { byte[] bytes = new byte[ writeBufferSize ]; - + int off = 0; int n = 0; - while( off < bytes.length && - ( n = is.read( bytes, off, bytes.length - off ) ) != -1 ) + while( off < bytes.length && + ( n = is.read( bytes, off, bytes.length - off ) ) != -1 ) { off += n; } - + if( n == -1 && off == 0 ) { return null; } - ByteBuffer buffer = ByteBuffer.wrap( bytes, 0, off ); - - return buffer; + return ByteBuffer.wrap( bytes, 0, off ); } /** - * Returns the size of the write buffer in bytes. Data will be read from the + * Returns the size of the write buffer in bytes. Data will be read from the * stream in chunks of this size and then written to the next filter. - * + * * @return the write buffer size. */ public int getWriteBufferSize() @@ -189,9 +190,9 @@ } /** - * Sets the size of the write buffer in bytes. Data will be read from the + * Sets the size of the write buffer in bytes. Data will be read from the * stream in chunks of this size and then written to the next filter. - * + * * @throws IllegalArgumentException if the specified size is < 1. */ public void setWriteBufferSize( int writeBufferSize ) @@ -202,6 +203,6 @@ } this.writeBufferSize = writeBufferSize; } - - + + } Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Sun Nov 5 11:36:41 2006 @@ -6,16 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.filter.codec; @@ -35,7 +35,7 @@ * An {@link IoFilter} which translates binary or protocol specific data into * message object and vice versa using {@link ProtocolCodecFactory}, * {@link ProtocolEncoder}, or {@link ProtocolDecoder}. - * + * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ @@ -43,12 +43,12 @@ { public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder"; public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder"; - + private static final Class[] EMPTY_PARAMS = new Class[0]; private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); private final ProtocolCodecFactory factory; - + public ProtocolCodecFilter( ProtocolCodecFactory factory ) { if( factory == null ) @@ -57,7 +57,7 @@ } this.factory = factory; } - + public ProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) { if( encoder == null ) @@ -68,7 +68,7 @@ { throw new NullPointerException( "decoder" ); } - + this.factory = new ProtocolCodecFactory() { public ProtocolEncoder getEncoder() @@ -82,7 +82,7 @@ } }; } - + public ProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) { if( encoderClass == null ) @@ -117,7 +117,7 @@ { throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); } - + this.factory = new ProtocolCodecFactory() { public ProtocolEncoder getEncoder() throws Exception @@ -132,6 +132,7 @@ }; } + @Override public void onPreAdd( IoFilterChain parent, String name, NextFilter nextFilter ) throws Exception { if( parent.contains( ProtocolCodecFilter.class ) ) @@ -140,6 +141,7 @@ } } + @Override public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception { if( !( message instanceof ByteBuffer ) ) @@ -151,7 +153,7 @@ ByteBuffer in = ( ByteBuffer ) message; ProtocolDecoder decoder = getDecoder( session ); ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); - + try { decoder.decode( session, in, decoderOut ); @@ -185,6 +187,7 @@ } } + @Override public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception { if( message instanceof HiddenByteBuffer ) @@ -200,7 +203,8 @@ nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); } - + + @Override public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception { Object message = writeRequest.getMessage(); @@ -212,7 +216,7 @@ ProtocolEncoder encoder = getEncoder( session ); ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); - + try { encoder.encode( session, message, encoderOut ); @@ -245,7 +249,8 @@ } } } - + + @Override public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception { // Call finishDecode() first when a connection is closed. @@ -276,7 +281,7 @@ decoderOut.flush(); } - + nextFilter.sessionClosed( session ); } @@ -290,12 +295,12 @@ } return encoder; } - + private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, NextFilter nextFilter, WriteRequest writeRequest ) { return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); } - + private ProtocolDecoder getDecoder( IoSession session ) throws Exception { ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); @@ -306,12 +311,12 @@ } return decoder; } - + private ProtocolDecoderOutput getDecoderOut( IoSession session, NextFilter nextFilter ) { return new SimpleProtocolDecoderOutput( session, nextFilter ); } - + private void disposeEncoder( IoSession session ) { ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); @@ -361,41 +366,44 @@ super( buf ); } } - + private static class MessageByteBuffer extends ByteBufferProxy { private final Object message; - + private MessageByteBuffer( Object message ) { super( EMPTY_BUFFER ); this.message = message; } + @Override public void acquire() { // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message } + @Override public void release() { // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message } } - + private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput { private final IoSession session; private final NextFilter nextFilter; private final WriteRequest writeRequest; - - public ProtocolEncoderOutputImpl( IoSession session, NextFilter nextFilter, WriteRequest writeRequest ) + + ProtocolEncoderOutputImpl( IoSession session, NextFilter nextFilter, WriteRequest writeRequest ) { this.session = session; this.nextFilter = nextFilter; this.writeRequest = writeRequest; } + @Override protected WriteFuture doFlush( ByteBuffer buf ) { WriteFuture future = new DefaultWriteFuture( session ); Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java Sun Nov 5 11:36:41 2006 @@ -6,28 +6,30 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.filter.codec.support; -import org.apache.mina.common.IoSession; +import java.util.ArrayList; +import java.util.List; + import org.apache.mina.common.IoFilter.NextFilter; +import org.apache.mina.common.IoSession; import org.apache.mina.common.support.BaseIoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.util.Queue; /** * A {@link ProtocolDecoderOutput} based on queue. - * + * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ * @@ -36,17 +38,17 @@ { private final NextFilter nextFilter; private final IoSession session; - private final Queue messageQueue = new Queue(); - + private final List messageQueue = new ArrayList(); + public SimpleProtocolDecoderOutput( IoSession session, NextFilter nextFilter ) { this.nextFilter = nextFilter; this.session = session; } - + public void write( Object message ) { - messageQueue.push( message ); + messageQueue.add( message ); if( session instanceof BaseIoSession ) { ( ( BaseIoSession ) session ).increaseReadMessages(); @@ -57,8 +59,8 @@ { while( !messageQueue.isEmpty() ) { - nextFilter.messageReceived( session, messageQueue.pop() ); + nextFilter.messageReceived( session, messageQueue.remove(0) ); } - + } } Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java Sun Nov 5 11:36:41 2006 @@ -6,23 +6,25 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.filter.codec.support; +import java.util.ArrayList; +import java.util.List; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.WriteFuture; import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.util.Queue; /** * A {@link ProtocolEncoderOutput} based on queue. @@ -32,88 +34,73 @@ */ public abstract class SimpleProtocolEncoderOutput implements ProtocolEncoderOutput { - private final Queue bufferQueue = new Queue(); - + private final List bufferQueue = new ArrayList(); + public SimpleProtocolEncoderOutput() { } - - public Queue getBufferQueue() + + public List getBufferQueue() { return bufferQueue; } - + public void write( ByteBuffer buf ) { - bufferQueue.push( buf ); + bufferQueue.add( buf ); } - + public void mergeAll() { - int sum = 0; final int size = bufferQueue.size(); - + if( size < 2 ) { // no need to merge! return; } - + // Get the size of merged BB + int sum = 0; for( int i = size - 1; i >= 0; i -- ) { - sum += ( ( ByteBuffer ) bufferQueue.get( i ) ).remaining(); + sum += bufferQueue.get( i ).remaining(); } - + // Allocate a new BB that will contain all fragments ByteBuffer newBuf = ByteBuffer.allocate( sum ); - + // and merge all. - for( ;; ) + for( ; !bufferQueue.isEmpty(); ) { - ByteBuffer buf = ( ByteBuffer ) bufferQueue.pop(); - if( buf == null ) - { - break; - } - + ByteBuffer buf = bufferQueue.remove( 0 ); + newBuf.put( buf ); buf.release(); } - + // Push the new buffer finally. newBuf.flip(); - bufferQueue.push(newBuf); + bufferQueue.add( newBuf ); } - + public WriteFuture flush() { - Queue bufferQueue = this.bufferQueue; WriteFuture future = null; - if( bufferQueue.isEmpty() ) - { - return null; - } - else + + for( ; !bufferQueue.isEmpty(); ) { - for( ;; ) + ByteBuffer buf = bufferQueue.remove( 0 ); + + // Flush only when the buffer has remaining. + if( buf.hasRemaining() ) { - ByteBuffer buf = ( ByteBuffer ) bufferQueue.pop(); - if( buf == null ) - { - break; - } - - // Flush only when the buffer has remaining. - if( buf.hasRemaining() ) - { - future = doFlush( buf ); - } + future = doFlush( buf ); } } - + return future; } - + protected abstract WriteFuture doFlush( ByteBuffer buf ); } Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Nov 5 11:36:41 2006 @@ -27,21 +27,23 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.mina.common.ExceptionMonitor; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.support.BaseIoAcceptor; -import org.apache.mina.util.Queue; -import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.NamePreservingRunnable; -import java.util.concurrent.Executor; +import org.apache.mina.util.NewThreadExecutor; /** * {@link IoAcceptor} for socket transport (TCP/IP). @@ -51,27 +53,21 @@ */ public class SocketAcceptor extends BaseIoAcceptor { - /** - * @noinspection StaticNonFinalField - */ - private static volatile int nextId = 0; + private static final AtomicInteger nextId = new AtomicInteger(); private final Executor executor; private final Object lock = new Object(); - private final int id = nextId ++; + private final int id = nextId.getAndIncrement(); private final String threadName = "SocketAcceptor-" + id; private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig(); - private final Map channels = new HashMap(); + private final Map channels = new ConcurrentHashMap(); - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); + private final Queue registerQueue = new ConcurrentLinkedQueue(); + private final Queue cancelQueue = new ConcurrentLinkedQueue(); private final SocketIoProcessor[] ioProcessors; private final int processorCount; - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ private Selector selector; private Worker worker; private int processorDistributor = 0; @@ -88,7 +84,7 @@ * Create an acceptor with the desired number of processing threads * * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads + * @param executor Executor to use for launching threads */ public SocketAcceptor( int processorCount, Executor executor ) { @@ -132,10 +128,7 @@ RegistrationRequest request = new RegistrationRequest( address, handler, config ); - synchronized( registerQueue ) - { - registerQueue.push( request ); - } + registerQueue.add( request ); startupWorker(); @@ -199,10 +192,7 @@ throw new IllegalArgumentException( "Address not bound: " + address ); } - synchronized( cancelQueue ) - { - cancelQueue.push( request ); - } + cancelQueue.add( request ); selector.wakeup(); @@ -231,15 +221,11 @@ public void unbindAll() { - List addresses; - synchronized( channels ) - { - addresses = new ArrayList( channels.keySet() ); - } + List addresses = new ArrayList( channels.keySet() ); - for( Iterator i = addresses.iterator(); i.hasNext(); ) + for( SocketAddress address : addresses ) { - unbind( ( SocketAddress ) i.next() ); + unbind( address ); } } @@ -247,7 +233,7 @@ { public void run() { - Thread.currentThread().setName(SocketAcceptor.this.threadName ); + Thread.currentThread().setName( SocketAcceptor.this.threadName ); for( ; ; ) { @@ -306,12 +292,12 @@ } } - private void processSessions( Set keys ) throws IOException + private void processSessions( Set keys ) throws IOException { - Iterator it = keys.iterator(); + Iterator it = keys.iterator(); while( it.hasNext() ) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = it.next(); it.remove(); @@ -334,8 +320,8 @@ { RegistrationRequest req = ( RegistrationRequest ) key.attachment(); SocketSessionImpl session = new SocketSessionImpl( - SocketAcceptor.this, nextProcessor(), getListeners(), - req.config, ch, req.handler, req.address ); + SocketAcceptor.this, nextProcessor(), getListeners(), + req.config, ch, req.handler, req.address ); getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); @@ -391,12 +377,7 @@ for( ; ; ) { - RegistrationRequest req; - - synchronized( registerQueue ) - { - req = ( RegistrationRequest ) registerQueue.pop(); - } + RegistrationRequest req = registerQueue.poll(); if( req == null ) { @@ -433,13 +414,10 @@ } ssc.register( selector, SelectionKey.OP_ACCEPT, req ); - synchronized( channels ) - { - channels.put( req.address, ssc ); - } + channels.put( req.address, ssc ); getListeners().fireServiceActivated( - this, req.address, req.handler, req.config ); + this, req.address, req.handler, req.config ); } catch( IOException e ) { @@ -479,23 +457,14 @@ for( ; ; ) { - CancellationRequest request; - - synchronized( cancelQueue ) - { - request = ( CancellationRequest ) cancelQueue.pop(); - } + CancellationRequest request = cancelQueue.poll(); if( request == null ) { break; } - ServerSocketChannel ssc; - synchronized( channels ) - { - ssc = ( ServerSocketChannel ) channels.remove( request.address ); - } + ServerSocketChannel ssc = channels.remove( request.address ); // close the channel try @@ -530,9 +499,9 @@ if( request.exception == null ) { getListeners().fireServiceDeactivated( - this, request.address, - request.registrationRequest.handler, - request.registrationRequest.config ); + this, request.address, + request.registrationRequest.handler, + request.registrationRequest.config ); } } } Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Nov 5 11:36:41 2006 @@ -26,8 +26,11 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.Iterator; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.ExceptionMonitor; @@ -38,10 +41,8 @@ import org.apache.mina.common.support.AbstractIoFilterChain; import org.apache.mina.common.support.BaseIoConnector; import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.util.Queue; -import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.NamePreservingRunnable; -import java.util.concurrent.Executor; +import org.apache.mina.util.NewThreadExecutor; /** * {@link IoConnector} for socket transport (TCP/IP). @@ -51,23 +52,17 @@ */ public class SocketConnector extends BaseIoConnector { - /** - * @noinspection StaticNonFinalField - */ - private static volatile int nextId = 0; + private static final AtomicInteger nextId = new AtomicInteger( ); private final Object lock = new Object(); - private final int id = nextId++; + private final int id = nextId.getAndIncrement(); private final String threadName = "SocketConnector-" + id; private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); - private final Queue connectQueue = new Queue(); + private final Queue connectQueue = new ConcurrentLinkedQueue( ); private final SocketIoProcessor[] ioProcessors; private final int processorCount; private final Executor executor; - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ private Selector selector; private Worker worker; private int processorDistributor = 0; @@ -218,10 +213,7 @@ } } - synchronized( connectQueue ) - { - connectQueue.push( request ); - } + connectQueue.add( request ); selector.wakeup(); return request; @@ -264,11 +256,7 @@ for( ; ; ) { - ConnectionRequest req; - synchronized( connectQueue ) - { - req = ( ConnectionRequest ) connectQueue.pop(); - } + ConnectionRequest req = connectQueue.poll(); if( req == null ) break; @@ -285,14 +273,10 @@ } } - private void processSessions( Set keys ) + private void processSessions( Set keys ) { - Iterator it = keys.iterator(); - - while( it.hasNext() ) + for( SelectionKey key : keys ) { - SelectionKey key = ( SelectionKey ) it.next(); - if( !key.isConnectable() ) continue; @@ -330,15 +314,12 @@ keys.clear(); } - private void processTimedOutSessions( Set keys ) + private void processTimedOutSessions( Set keys ) { long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - while( it.hasNext() ) + for( SelectionKey key : keys ) { - SelectionKey key = ( SelectionKey ) it.next(); - if( !key.isValid() ) continue; Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Sun Nov 5 11:36:41 2006 @@ -6,31 +6,31 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.transport.socket.nio; import java.io.IOException; +import java.util.Queue; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.support.AbstractIoFilterChain; -import org.apache.mina.util.Queue; /** * An {@link IoFilterChain} for socket transport (TCP/IP). - * + * * @author The Apache Directory Project (mina-dev@directory.apache.org) */ class SocketFilterChain extends AbstractIoFilterChain { @@ -40,17 +40,18 @@ super( parent ); } - protected void doWrite( IoSession session, WriteRequest writeRequest ) + @Override + protected void doWrite( IoSession session, WriteRequest writeRequest ) { SocketSessionImpl s = ( SocketSessionImpl ) session; - Queue writeRequestQueue = s.getWriteRequestQueue(); - + Queue writeRequestQueue = s.getWriteRequestQueue(); + // SocketIoProcessor.doFlush() will reset it after write is finished - // because the buffer will be passed with messageSent event. + // because the buffer will be passed with messageSent event. ( ( ByteBuffer ) writeRequest.getMessage() ).mark(); synchronized( writeRequestQueue ) { - writeRequestQueue.push( writeRequest ); + writeRequestQueue.add( writeRequest ); if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() ) { // Notify SocketIoProcessor only when writeRequestQueue was empty. @@ -59,7 +60,8 @@ } } - protected void doClose( IoSession session ) throws IOException + @Override + protected void doClose( IoSession session ) throws IOException { SocketSessionImpl s = ( SocketSessionImpl ) session; s.getIoProcessor().remove( s ); Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Nov 5 11:36:41 2006 @@ -23,8 +23,9 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.Iterator; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import org.apache.mina.common.ByteBuffer; @@ -33,7 +34,6 @@ import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.WriteTimeoutException; import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.Queue; /** * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. @@ -47,15 +47,12 @@ private final String threadName; private final Executor executor; - /** - * @noinspection FieldAccessedSynchronizedAndUnsynchronized - */ private Selector selector; - private final Queue newSessions = new Queue(); - private final Queue removingSessions = new Queue(); - private final Queue flushingSessions = new Queue(); - private final Queue trafficControllingSessions = new Queue(); + private final Queue newSessions = new ConcurrentLinkedQueue(); + private final Queue removingSessions = new ConcurrentLinkedQueue(); + private final Queue flushingSessions = new ConcurrentLinkedQueue(); + private final Queue trafficControllingSessions = new ConcurrentLinkedQueue(); private Worker worker; private long lastIdleCheckTime = System.currentTimeMillis(); @@ -68,10 +65,7 @@ void addNew( SocketSessionImpl session ) throws IOException { - synchronized( newSessions ) - { - newSessions.push( session ); - } + newSessions.add( session ); startupWorker(); @@ -120,41 +114,24 @@ private void scheduleRemove( SocketSessionImpl session ) { - synchronized( removingSessions ) - { - removingSessions.push( session ); - } + removingSessions.add( session ); } private void scheduleFlush( SocketSessionImpl session ) { - synchronized( flushingSessions ) - { - flushingSessions.push( session ); - } + flushingSessions.add( session ); } private void scheduleTrafficControl( SocketSessionImpl session ) { - synchronized( trafficControllingSessions ) - { - trafficControllingSessions.push( session ); - } + trafficControllingSessions.add( session ); } private void doAddNew() { - if( newSessions.isEmpty() ) - return; - for( ; ; ) { - SocketSessionImpl session; - - synchronized( newSessions ) - { - session = ( SocketSessionImpl ) newSessions.pop(); - } + SocketSessionImpl session = newSessions.poll(); if( session == null ) break; @@ -164,8 +141,8 @@ { ch.configureBlocking( false ); session.setSelectionKey( ch.register( selector, - SelectionKey.OP_READ, - session ) ); + SelectionKey.OP_READ, + session ) ); // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). @@ -182,17 +159,9 @@ private void doRemove() { - if( removingSessions.isEmpty() ) - return; - for( ; ; ) { - SocketSessionImpl session; - - synchronized( removingSessions ) - { - session = ( SocketSessionImpl ) removingSessions.pop(); - } + SocketSessionImpl session = removingSessions.poll(); if( session == null ) break; @@ -229,13 +198,10 @@ } } - private void process( Set selectedKeys ) + private void process( Set selectedKeys ) { - Iterator it = selectedKeys.iterator(); - - while( it.hasNext() ) + for( SelectionKey key : selectedKeys ) { - SelectionKey key = ( SelectionKey ) it.next(); SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); if( key.isReadable() && session.getTrafficMask().isReadable() ) @@ -308,12 +274,11 @@ if( ( currentTime - lastIdleCheckTime ) >= 1000 ) { lastIdleCheckTime = currentTime; - Set keys = selector.keys(); + Set keys = selector.keys(); if( keys != null ) { - for( Iterator it = keys.iterator(); it.hasNext(); ) + for( SelectionKey key : keys ) { - SelectionKey key = ( SelectionKey ) it.next(); SocketSessionImpl session = ( SocketSessionImpl ) key.attachment(); notifyIdleness( session, currentTime ); } @@ -371,17 +336,9 @@ private void doFlush() { - if( flushingSessions.size() == 0 ) - return; - for( ; ; ) { - SocketSessionImpl session; - - synchronized( flushingSessions ) - { - session = ( SocketSessionImpl ) flushingSessions.pop(); - } + SocketSessionImpl session = flushingSessions.poll(); if( session == null ) break; @@ -420,10 +377,10 @@ private void releaseWriteBuffers( SocketSessionImpl session ) { - Queue writeRequestQueue = session.getWriteRequestQueue(); + Queue writeRequestQueue = session.getWriteRequestQueue(); WriteRequest req; - while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null ) + while( ( req = writeRequestQueue.poll() ) != null ) { try { @@ -447,16 +404,11 @@ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) ); SocketChannel ch = session.getChannel(); - Queue writeRequestQueue = session.getWriteRequestQueue(); + Queue writeRequestQueue = session.getWriteRequestQueue(); for( ; ; ) { - WriteRequest req; - - synchronized( writeRequestQueue ) - { - req = ( WriteRequest ) writeRequestQueue.first(); - } + WriteRequest req = writeRequestQueue.peek(); if( req == null ) break; @@ -464,10 +416,7 @@ ByteBuffer buf = ( ByteBuffer ) req.getMessage(); if( buf.remaining() == 0 ) { - synchronized( writeRequestQueue ) - { - writeRequestQueue.pop(); - } + writeRequestQueue.poll(); session.increaseWrittenMessages(); @@ -498,12 +447,7 @@ for( ; ; ) { - SocketSessionImpl session; - - synchronized( trafficControllingSessions ) - { - session = ( SocketSessionImpl ) trafficControllingSessions.pop(); - } + SocketSessionImpl session = trafficControllingSessions.poll(); if( session == null ) break; @@ -526,7 +470,7 @@ // The normal is OP_READ and, if there are write requests in the // session's write queue, set OP_WRITE to trigger flushing. int ops = SelectionKey.OP_READ; - Queue writeRequestQueue = session.getWriteRequestQueue(); + Queue writeRequestQueue = session.getWriteRequestQueue(); synchronized( writeRequestQueue ) { if( !writeRequestQueue.isEmpty() ) Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sun Nov 5 11:36:41 2006 @@ -6,16 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. + * */ package org.apache.mina.transport.socket.nio; @@ -23,7 +23,10 @@ import java.net.SocketException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoService; @@ -32,11 +35,10 @@ import org.apache.mina.common.IoSessionConfig; import org.apache.mina.common.RuntimeIOException; import org.apache.mina.common.TransportType; -import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.support.BaseIoSession; import org.apache.mina.common.support.BaseIoSessionConfig; import org.apache.mina.common.support.IoServiceListenerSupport; -import org.apache.mina.util.Queue; /** * An {@link IoSession} for socket transport (TCP/IP). @@ -52,7 +54,7 @@ private final SocketIoProcessor ioProcessor; private final SocketFilterChain filterChain; private final SocketChannel ch; - private final Queue writeRequestQueue; + private final Queue writeRequestQueue; private final IoHandler handler; private final SocketAddress remoteAddress; private final SocketAddress localAddress; @@ -77,7 +79,7 @@ this.ioProcessor = ioProcessor; this.filterChain = new SocketFilterChain( this ); this.ch = ch; - this.writeRequestQueue = new Queue(); + this.writeRequestQueue = new ConcurrentLinkedQueue(); this.handler = defaultHandler; this.remoteAddress = ch.socket().getRemoteSocketAddress(); this.localAddress = ch.socket().getLocalSocketAddress(); @@ -109,7 +111,7 @@ { return manager; } - + public IoServiceConfig getServiceConfig() { return serviceConfig; @@ -149,18 +151,19 @@ { this.key = key; } - + public IoHandler getHandler() { return handler; } + @Override protected void close0() { filterChain.fireFilterClose( this ); } - Queue getWriteRequestQueue() + Queue getWriteRequestQueue() { return writeRequestQueue; } @@ -173,14 +176,25 @@ } } + /** + * Returns the sum of the 'remaining' of all {@link ByteBuffer}s + * in the writeRequestQueue queue. + * + * @throws ClassCastException if an element is not a {@link ByteBuffer} + */ public int getScheduledWriteBytes() { - synchronized( writeRequestQueue ) + int byteSize = 0; + + for( WriteRequest request : writeRequestQueue ) { - return writeRequestQueue.byteSize(); + byteSize += ( ( ByteBuffer ) request.getMessage() ).remaining(); } + + return byteSize; } + @Override protected void write0( WriteRequest writeRequest ) { filterChain.fireFilterWrite( this, writeRequest ); @@ -206,6 +220,7 @@ return serviceAddress; } + @Override protected void updateTrafficMask() { this.ioProcessor.updateTrafficMask( this ); @@ -216,6 +231,7 @@ return readBufferSize; } + @SuppressWarnings( {"CloneableClassWithoutClone"} ) private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig { public boolean isKeepAlive() Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=471502&r1=471501&r2=471502 ============================================================================== --- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original) +++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Sun Nov 5 11:36:41 2006 @@ -26,29 +26,31 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ExceptionMonitor; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.common.IoSessionRecycler; -import org.apache.mina.common.IoFilter.WriteRequest; +import org.apache.mina.common.RuntimeIOException; import org.apache.mina.common.support.BaseIoAcceptor; import org.apache.mina.common.support.IoServiceListenerSupport; import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig; import org.apache.mina.transport.socket.nio.DatagramServiceConfig; import org.apache.mina.transport.socket.nio.DatagramSessionConfig; import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.Queue; - -import java.util.concurrent.Executor; /** * {@link IoAcceptor} for datagram transport (UDP/IP). @@ -58,17 +60,18 @@ */ public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramService { - private static volatile int nextId = 0; + private static final AtomicInteger nextId = new AtomicInteger(); + private final Object lock = new Object(); private final IoAcceptor wrapper; private final Executor executor; - private final int id = nextId ++ ; + private final int id = nextId.getAndIncrement(); private Selector selector; private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig(); - private final Map channels = new HashMap(); - private final Queue registerQueue = new Queue(); - private final Queue cancelQueue = new Queue(); - private final Queue flushingSessions = new Queue(); + private final Map channels = new ConcurrentHashMap(); + private final Queue registerQueue = new ConcurrentLinkedQueue(); + private final Queue cancelQueue = new ConcurrentLinkedQueue(); + private final Queue flushingSessions = new ConcurrentLinkedQueue(); private Worker worker; /** @@ -81,7 +84,7 @@ } public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) - throws IOException + throws IOException { if( handler == null ) throw new NullPointerException( "handler" ); @@ -92,17 +95,12 @@ if( address != null && !( address instanceof InetSocketAddress ) ) throw new IllegalArgumentException( "Unexpected address type: " - + address.getClass() ); + + address.getClass() ); RegistrationRequest request = new RegistrationRequest( address, handler, config ); - synchronized( this ) - { - synchronized( registerQueue ) - { - registerQueue.push( request ); - } - startupWorker(); - } + registerQueue.add( request ); + startupWorker(); + selector.wakeup(); synchronized( request ) @@ -115,6 +113,7 @@ } catch( InterruptedException e ) { + throw new RuntimeIOException( e ); } } } @@ -131,26 +130,20 @@ throw new NullPointerException( "address" ); CancellationRequest request = new CancellationRequest( address ); - synchronized( this ) + try { - try - { - startupWorker(); - } - catch( IOException e ) - { - // IOException is thrown only when Worker thread is not - // running and failed to open a selector. We simply throw - // IllegalArgumentException here because we can simply - // conclude that nothing is bound to the selector. - throw new IllegalArgumentException( "Address not bound: " + address ); - } - - synchronized( cancelQueue ) - { - cancelQueue.push( request ); - } + startupWorker(); } + catch( IOException e ) + { + // IOException is thrown only when Worker thread is not + // running and failed to open a selector. We simply throw + // IllegalArgumentException here because we can simply + // conclude that nothing is bound to the selector. + throw new IllegalArgumentException( "Address not bound: " + address ); + } + + cancelQueue.add( request ); selector.wakeup(); synchronized( request ) @@ -163,30 +156,28 @@ } catch( InterruptedException e ) { + throw new RuntimeIOException( e ); } } } if( request.exception != null ) { - throw new RuntimeException( "Failed to unbind" , request.exception ); + throw new RuntimeException( "Failed to unbind", request.exception ); } } public void unbindAll() { - List addresses; - synchronized( channels ) - { - addresses = new ArrayList( channels.keySet() ); - } + List addresses = new ArrayList( channels.keySet() ); - for( Iterator i = addresses.iterator(); i.hasNext(); ) + for( SocketAddress address : addresses ) { - unbind( ( SocketAddress ) i.next() ); + unbind( address ); } } + @Override public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress ) { if( remoteAddress == null ) @@ -199,7 +190,7 @@ } Selector selector = this.selector; - DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress ); + DatagramChannel ch = channels.get( localAddress ); if( selector == null || ch == null ) { throw new IllegalArgumentException( "Unknown localAddress: " + localAddress ); @@ -214,7 +205,7 @@ RegistrationRequest req = ( RegistrationRequest ) key.attachment(); IoSession session; IoSessionRecycler sessionRecycler = getSessionRecycler( req ); - synchronized ( sessionRecycler ) + synchronized( sessionRecycler ) { session = sessionRecycler.recycle( localAddress, remoteAddress ); if( session != null ) @@ -224,9 +215,9 @@ // If a new session needs to be created. DatagramSessionImpl datagramSession = new DatagramSessionImpl( - wrapper, this, - req.config, ch, req.handler, - req.address ); + wrapper, this, + req.config, ch, req.handler, + req.address ); datagramSession.setRemoteAddress( remoteAddress ); datagramSession.setSelectionKey( key ); @@ -261,6 +252,7 @@ return sessionRecycler; } + @Override public IoServiceListenerSupport getListeners() { return super.getListeners(); @@ -293,13 +285,16 @@ this.defaultConfig = defaultConfig; } - private synchronized void startupWorker() throws IOException + private void startupWorker() throws IOException { - if( worker == null ) + synchronized( lock ) { - selector = Selector.open(); - worker = new Worker(); - executor.execute( new NamePreservingRunnable( worker ) ); + if( worker == null ) + { + selector = Selector.open(); + worker = new Worker(); + executor.execute( new NamePreservingRunnable( worker ) ); + } } } @@ -319,10 +314,7 @@ private void scheduleFlush( DatagramSessionImpl session ) { - synchronized( flushingSessions ) - { - flushingSessions.push( session ); - } + flushingSessions.add( session ); } private class Worker implements Runnable @@ -331,7 +323,7 @@ { Thread.currentThread().setName( "DatagramAcceptor-" + id ); - for( ;; ) + for( ; ; ) { try { @@ -349,7 +341,7 @@ if( selector.keys().isEmpty() ) { - synchronized( DatagramAcceptorDelegate.this ) + synchronized( lock ) { if( selector.keys().isEmpty() && registerQueue.isEmpty() && @@ -383,18 +375,19 @@ } catch( InterruptedException e1 ) { + ExceptionMonitor.getInstance().exceptionCaught( e1 ); } } } } } - private void processReadySessions( Set keys ) + private void processReadySessions( Set keys ) { - Iterator it = keys.iterator(); + Iterator it = keys.iterator(); while( it.hasNext() ) { - SelectionKey key = ( SelectionKey ) it.next(); + SelectionKey key = it.next(); it.remove(); DatagramChannel ch = ( DatagramChannel ) key.channel(); @@ -409,10 +402,9 @@ if( key.isWritable() ) { - for( Iterator i = getManagedSessions( req.address ).iterator(); - i.hasNext(); ) + for( Object o : getManagedSessions( req.address ) ) { - scheduleFlush( ( DatagramSessionImpl ) i.next() ); + scheduleFlush( ( DatagramSessionImpl ) o ); } } } @@ -426,7 +418,7 @@ private void readSession( DatagramChannel channel, RegistrationRequest req ) throws Exception { ByteBuffer readBuf = ByteBuffer.allocate( - ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() ); + ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() ); try { SocketAddress remoteAddress = channel.receive( @@ -457,14 +449,9 @@ if( flushingSessions.size() == 0 ) return; - for( ;; ) + for( ; ; ) { - DatagramSessionImpl session; - - synchronized( flushingSessions ) - { - session = ( DatagramSessionImpl ) flushingSessions.pop(); - } + DatagramSessionImpl session = flushingSessions.poll(); if( session == null ) break; @@ -484,15 +471,11 @@ { DatagramChannel ch = session.getChannel(); - Queue writeRequestQueue = session.getWriteRequestQueue(); + Queue writeRequestQueue = session.getWriteRequestQueue(); - WriteRequest req; - for( ;; ) + for( ; ; ) { - synchronized( writeRequestQueue ) - { - req = ( WriteRequest ) writeRequestQueue.first(); - } + WriteRequest req = writeRequestQueue.peek(); if( req == null ) break; @@ -501,14 +484,11 @@ if( buf.remaining() == 0 ) { // pop and fire event - synchronized( writeRequestQueue ) - { - writeRequestQueue.pop(); - } + writeRequestQueue.poll(); session.increaseWrittenMessages(); buf.reset(); - ( ( DatagramFilterChain ) session.getFilterChain() ).fireMessageSent( session, req ); + session.getFilterChain().fireMessageSent( session, req ); continue; } @@ -539,13 +519,10 @@ else if( writtenBytes > 0 ) { key.interestOps( key.interestOps() - & ( ~SelectionKey.OP_WRITE ) ); + & ( ~SelectionKey.OP_WRITE ) ); // pop and fire event - synchronized( writeRequestQueue ) - { - writeRequestQueue.pop(); - } + writeRequestQueue.poll(); session.increaseWrittenBytes( writtenBytes ); session.increaseWrittenMessages(); @@ -560,13 +537,9 @@ if( registerQueue.isEmpty() ) return; - for( ;; ) + for( ; ; ) { - RegistrationRequest req; - synchronized( registerQueue ) - { - req = ( RegistrationRequest ) registerQueue.pop(); - } + RegistrationRequest req = registerQueue.poll(); if( req == null ) break; @@ -602,13 +575,10 @@ req.address = ( InetSocketAddress ) ch.socket().getLocalSocketAddress(); } ch.register( selector, SelectionKey.OP_READ, req ); - synchronized( channels ) - { - channels.put( req.address, ch ); - } + channels.put( req.address, ch ); getListeners().fireServiceActivated( - this, req.address, req.handler, req.config); + this, req.address, req.handler, req.config ); } catch( Throwable t ) { @@ -643,24 +613,16 @@ if( cancelQueue.isEmpty() ) return; - for( ;; ) + for( ; ; ) { - CancellationRequest request; - synchronized( cancelQueue ) - { - request = ( CancellationRequest ) cancelQueue.pop(); - } + CancellationRequest request = cancelQueue.poll(); if( request == null ) { break; } - DatagramChannel ch; - synchronized( channels ) - { - ch = ( DatagramChannel ) channels.remove( request.address ); - } + DatagramChannel ch = channels.remove( request.address ); // close the channel try @@ -668,7 +630,7 @@ if( ch == null ) { request.exception = new IllegalArgumentException( - "Address not bound: " + request.address ); + "Address not bound: " + request.address ); } else { @@ -695,9 +657,9 @@ if( request.exception == null ) { getListeners().fireServiceDeactivated( - this, request.address, - request.registrationRequest.handler, - request.registrationRequest.config ); + this, request.address, + request.registrationRequest.handler, + request.registrationRequest.config ); } } }