directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pro...@apache.org
Subject svn commit: r471502 [1/2] - in /directory/branches/mina/1.2: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/common/support/ core/src/main/java/org/apache/mina/filter/ core/src/main/java/org/apache/mina/filter/codec/ core/...
Date Sun, 05 Nov 2006 19:36:43 GMT
Author: proyal
Date: Sun Nov  5 11:36:41 2006
New Revision: 471502

URL: http://svn.apache.org/viewvc?view=rev&rev=471502
Log:
Remove mina's Queue, use java.util.Queue. Replace synchronization with a concurrent implementation.

Removed:
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/BlockingQueue.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/Queue.java
    directory/branches/mina/1.2/core/src/test/java/org/apache/mina/util/QueueTest.java
Modified:
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java
    directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
    directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
    directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java
    directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java
    directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java Sun Nov  5 11:36:41 2006
@@ -6,32 +6,32 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.common;
 
 /**
  * A configuration which is used to configure {@link IoService}.
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
-public interface IoServiceConfig
+public interface IoServiceConfig extends Cloneable
 {
     /**
      * Resturns the default configuration of the new {@link IoSession}s.
      */
     IoSessionConfig getSessionConfig();
-    
+
     /**
      * Returns the {@link IoFilterChainBuilder} which will modify the
      * {@link IoFilterChain} of all {@link IoSession}s which is created
@@ -39,7 +39,7 @@
      * The default value is an empty {@link DefaultIoFilterChainBuilder}.
      */
     IoFilterChainBuilder getFilterChainBuilder();
-    
+
     /**
      * Sets the {@link IoFilterChainBuilder} which will modify the
      * {@link IoFilterChain} of all {@link IoSession}s which is created
@@ -48,7 +48,7 @@
      * an empty {@link DefaultIoFilterChainBuilder}.
      */
     void setFilterChainBuilder( IoFilterChainBuilder builder );
-    
+
     /**
      * A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
      * Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
@@ -60,7 +60,7 @@
      *                               not a {@link DefaultIoFilterChainBuilder}
      */
     DefaultIoFilterChainBuilder getFilterChain();
-    
+
     /**
      * Returns the default {@link ThreadModel} of the {@link IoService}.
      * The default value is a {@link ExecutorThreadModel}() whose service name is
@@ -69,7 +69,7 @@
      * {@link ExecutorThreadModel#getInstance(String)}.
      */
     ThreadModel getThreadModel();
-    
+
     /**
      * Sets the default {@link ThreadModel} of the {@link IoService}.
      * If you specify <tt>null</tt>, this property will be set to the
@@ -80,9 +80,10 @@
      * {@link ExecutorThreadModel#getInstance(String)}.
      */
     void setThreadModel( ThreadModel threadModel );
-    
+
     /**
      * Returns a deep clone of this configuration.
      */
+    @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "override"})
     Object clone();
 }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.common.support;
 
@@ -29,7 +29,7 @@
 
 /**
  * A base implementation of {@link IoServiceConfig}.
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
@@ -39,17 +39,17 @@
      * Current filter chain builder.
      */
     private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
-    
+
     /**
      * The default thread model.
      */
     private final ThreadModel defaultThreadModel = ExecutorThreadModel.getInstance("AnonymousIoService");
-    
+
     /**
      * Current thread model.
      */
     private ThreadModel threadModel = defaultThreadModel;
-    
+
     public BaseIoServiceConfig()
     {
         super();
@@ -68,7 +68,7 @@
         }
         filterChainBuilder = builder;
     }
-    
+
     public DefaultIoFilterChainBuilder getFilterChain()
     {
         if( filterChainBuilder instanceof DefaultIoFilterChainBuilder )
@@ -81,7 +81,7 @@
                     "Current filter chain builder is not a DefaultIoFilterChainBuilder." );
         }
     }
-    
+
     public ThreadModel getThreadModel()
     {
         return threadModel;
@@ -97,7 +97,9 @@
         }
         this.threadModel = threadModel;
     }
-    
+
+    @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException"})
+    @Override
     public Object clone()
     {
         BaseIoServiceConfig ret;
@@ -109,20 +111,21 @@
         {
             throw ( InternalError ) new InternalError().initCause( e );
         }
-        
+
         // Try to clone the chain builder.
         try
         {
-            Method cloneMethod = this.filterChainBuilder.getClass().getMethod( "clone", null );
+            Method cloneMethod = this.filterChainBuilder.getClass().getMethod( "clone" );
             if( cloneMethod.isAccessible() )
             {
-                ret.filterChainBuilder = ( IoFilterChainBuilder ) cloneMethod.invoke( this.filterChainBuilder, null );
+                ret.filterChainBuilder = ( IoFilterChainBuilder ) cloneMethod.invoke( this.filterChainBuilder );
             }
         }
         catch( Exception e )
         {
+            // uncloneable
         }
-        
+
         return ret;
     }
 }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java Sun Nov  5 11:36:41 2006
@@ -6,50 +6,51 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
-import org.apache.mina.util.Queue;
 
 /**
  * Filter implementation which makes it possible to write {@link InputStream}
- * objects directly using {@link IoSession#write(Object)}. When an 
+ * objects directly using {@link IoSession#write(Object)}. When an
  * {@link InputStream} is written to a session this filter will read the bytes
  * from the stream into {@link ByteBuffer} objects and write those buffers
  * to the next filter. When end of stream has been reached this filter will
  * call {@link NextFilter#messageSent(IoSession, Object)} using the original
- * {@link InputStream} written to the session and notifies 
- * {@link org.apache.mina.common.WriteFuture} on the 
+ * {@link InputStream} written to the session and notifies
+ * {@link org.apache.mina.common.WriteFuture} on the
  * original {@link org.apache.mina.common.IoFilter.WriteRequest}.
- * <p>
+ * <p/>
  * This filter will ignore written messages which aren't {@link InputStream}
  * instances. Such messages will be passed to the next filter directly.
  * </p>
- * <p>
+ * <p/>
  * NOTE: this filter does not close the stream after all data from stream
  * has been written. The {@link org.apache.mina.common.IoHandler} should take
- * care of that in its 
- * {@link org.apache.mina.common.IoHandler#messageSent(IoSession, Object)} 
+ * care of that in its
+ * {@link org.apache.mina.common.IoHandler#messageSent(IoSession, Object)}
  * callback.
  * </p>
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
@@ -69,32 +70,33 @@
     protected static final String INITIAL_WRITE_FUTURE = StreamWriteFilter.class.getName() + ".future";
 
     private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
-    
-    public void filterWrite( NextFilter nextFilter, IoSession session, 
-                            WriteRequest writeRequest ) throws Exception 
+
+    @Override
+    public void filterWrite( NextFilter nextFilter, IoSession session,
+                             WriteRequest writeRequest ) throws Exception
     {
         // If we're already processing a stream we need to queue the WriteRequest.
         if( session.getAttribute( CURRENT_STREAM ) != null )
         {
-            Queue queue = ( Queue ) session.getAttribute( WRITE_REQUEST_QUEUE );
+            Queue<WriteRequest> queue = ( Queue<WriteRequest> ) session.getAttribute( WRITE_REQUEST_QUEUE );
             if( queue == null )
             {
-                queue = new Queue();
+                queue = new ConcurrentLinkedQueue<WriteRequest>( );
                 session.setAttribute( WRITE_REQUEST_QUEUE, queue );
             }
-            queue.push( writeRequest );
+            queue.add( writeRequest );
             return;
         }
-        
+
         Object message = writeRequest.getMessage();
-        
+
         if( message instanceof InputStream )
         {
-            
+
             InputStream inputStream = ( InputStream ) message;
-            
+
             ByteBuffer byteBuffer = getNextByteBuffer( inputStream );
-            if ( byteBuffer == null )
+            if( byteBuffer == null )
             {
                 // End of stream reached.
                 writeRequest.getFuture().setWritten( true );
@@ -104,7 +106,7 @@
             {
                 session.setAttribute( CURRENT_STREAM, inputStream );
                 session.setAttribute( INITIAL_WRITE_FUTURE, writeRequest.getFuture() );
-                
+
                 nextFilter.filterWrite( session, new WriteRequest( byteBuffer ) );
             }
 
@@ -115,10 +117,11 @@
         }
     }
 
+    @Override
     public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
     {
         InputStream inputStream = ( InputStream ) session.getAttribute( CURRENT_STREAM );
-        
+
         if( inputStream == null )
         {
             nextFilter.messageSent( session, message );
@@ -126,25 +129,25 @@
         else
         {
             ByteBuffer byteBuffer = getNextByteBuffer( inputStream );
-        
-            if( byteBuffer == null ) 
+
+            if( byteBuffer == null )
             {
                 // End of stream reached.
                 session.removeAttribute( CURRENT_STREAM );
                 WriteFuture writeFuture = ( WriteFuture ) session.removeAttribute( INITIAL_WRITE_FUTURE );
-                
+
                 // Write queued WriteRequests.
-                Queue queue = ( Queue ) session.removeAttribute( WRITE_REQUEST_QUEUE );
+                Queue<? extends WriteRequest> queue = ( Queue<? extends WriteRequest> ) session.removeAttribute( WRITE_REQUEST_QUEUE );
                 if( queue != null )
                 {
-                    WriteRequest wr = ( WriteRequest ) queue.pop();
+                    WriteRequest wr = queue.poll();
                     while( wr != null )
                     {
                         filterWrite( nextFilter, session, wr );
-                        wr = ( WriteRequest ) queue.pop();
+                        wr = queue.poll();
                     }
                 }
-                
+
                 writeFuture.setWritten( true );
                 nextFilter.messageSent( session, inputStream );
             }
@@ -155,32 +158,30 @@
         }
     }
 
-    private ByteBuffer getNextByteBuffer( InputStream is ) throws IOException 
+    private ByteBuffer getNextByteBuffer( InputStream is ) throws IOException
     {
         byte[] bytes = new byte[ writeBufferSize ];
-        
+
         int off = 0;
         int n = 0;
-        while( off < bytes.length && 
-              ( n = is.read( bytes, off, bytes.length - off ) ) != -1 )
+        while( off < bytes.length &&
+            ( n = is.read( bytes, off, bytes.length - off ) ) != -1 )
         {
             off += n;
         }
-        
+
         if( n == -1 && off == 0 )
         {
             return null;
         }
 
-        ByteBuffer buffer = ByteBuffer.wrap( bytes, 0, off );
-
-        return buffer;
+        return ByteBuffer.wrap( bytes, 0, off );
     }
 
     /**
-     * Returns the size of the write buffer in bytes. Data will be read from the 
+     * Returns the size of the write buffer in bytes. Data will be read from the
      * stream in chunks of this size and then written to the next filter.
-     * 
+     *
      * @return the write buffer size.
      */
     public int getWriteBufferSize()
@@ -189,9 +190,9 @@
     }
 
     /**
-     * Sets the size of the write buffer in bytes. Data will be read from the 
+     * Sets the size of the write buffer in bytes. Data will be read from the
      * stream in chunks of this size and then written to the next filter.
-     * 
+     *
      * @throws IllegalArgumentException if the specified size is &lt; 1.
      */
     public void setWriteBufferSize( int writeBufferSize )
@@ -202,6 +203,6 @@
         }
         this.writeBufferSize = writeBufferSize;
     }
-    
-    
+
+
 }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter.codec;
 
@@ -35,7 +35,7 @@
  * An {@link IoFilter} which translates binary or protocol specific data into
  * message object and vice versa using {@link ProtocolCodecFactory},
  * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
@@ -43,12 +43,12 @@
 {
     public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder";
     public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder";
-    
+
     private static final Class[] EMPTY_PARAMS = new Class[0];
     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
 
     private final ProtocolCodecFactory factory;
-    
+
     public ProtocolCodecFilter( ProtocolCodecFactory factory )
     {
         if( factory == null )
@@ -57,7 +57,7 @@
         }
         this.factory = factory;
     }
-    
+
     public ProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
     {
         if( encoder == null )
@@ -68,7 +68,7 @@
         {
             throw new NullPointerException( "decoder" );
         }
-        
+
         this.factory = new ProtocolCodecFactory()
         {
             public ProtocolEncoder getEncoder()
@@ -82,7 +82,7 @@
             }
         };
     }
-    
+
     public ProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
     {
         if( encoderClass == null )
@@ -117,7 +117,7 @@
         {
             throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
         }
-        
+
         this.factory = new ProtocolCodecFactory()
         {
             public ProtocolEncoder getEncoder() throws Exception
@@ -132,6 +132,7 @@
         };
     }
 
+    @Override
     public void onPreAdd( IoFilterChain parent, String name, NextFilter nextFilter ) throws Exception
     {
         if( parent.contains( ProtocolCodecFilter.class ) )
@@ -140,6 +141,7 @@
         }
     }
 
+    @Override
     public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
     {
         if( !( message instanceof ByteBuffer ) )
@@ -151,7 +153,7 @@
         ByteBuffer in = ( ByteBuffer ) message;
         ProtocolDecoder decoder = getDecoder( session );
         ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
-        
+
         try
         {
             decoder.decode( session, in, decoderOut );
@@ -185,6 +187,7 @@
         }
     }
 
+    @Override
     public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
     {
         if( message instanceof HiddenByteBuffer )
@@ -200,7 +203,8 @@
 
         nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
     }
-    
+
+    @Override
     public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
     {
         Object message = writeRequest.getMessage();
@@ -212,7 +216,7 @@
 
         ProtocolEncoder encoder = getEncoder( session );
         ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
-        
+
         try
         {
             encoder.encode( session, message, encoderOut );
@@ -245,7 +249,8 @@
             }
         }
     }
-    
+
+    @Override
     public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception
     {
         // Call finishDecode() first when a connection is closed.
@@ -276,7 +281,7 @@
 
             decoderOut.flush();
         }
-        
+
         nextFilter.sessionClosed( session );
     }
 
@@ -290,12 +295,12 @@
         }
         return encoder;
     }
-    
+
     private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, NextFilter nextFilter, WriteRequest writeRequest )
     {
         return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
     }
-    
+
     private ProtocolDecoder getDecoder( IoSession session ) throws Exception
     {
         ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
@@ -306,12 +311,12 @@
         }
         return decoder;
     }
-    
+
     private ProtocolDecoderOutput getDecoderOut( IoSession session, NextFilter nextFilter )
     {
         return new SimpleProtocolDecoderOutput( session, nextFilter );
     }
-    
+
     private void disposeEncoder( IoSession session )
     {
         ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
@@ -361,41 +366,44 @@
             super( buf );
         }
     }
-    
+
     private static class MessageByteBuffer extends ByteBufferProxy
     {
         private final Object message;
-        
+
         private MessageByteBuffer( Object message )
         {
             super( EMPTY_BUFFER );
             this.message = message;
         }
 
+        @Override
         public void acquire()
         {
             // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
         }
 
+        @Override
         public void release()
         {
             // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
         }
     }
-    
+
     private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput
     {
         private final IoSession session;
         private final NextFilter nextFilter;
         private final WriteRequest writeRequest;
-        
-        public ProtocolEncoderOutputImpl( IoSession session, NextFilter nextFilter, WriteRequest writeRequest )
+
+        ProtocolEncoderOutputImpl( IoSession session, NextFilter nextFilter, WriteRequest writeRequest )
         {
             this.session = session;
             this.nextFilter = nextFilter;
             this.writeRequest = writeRequest;
         }
 
+        @Override
         protected WriteFuture doFlush( ByteBuffer buf )
         {
             WriteFuture future = new DefaultWriteFuture( session );

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java Sun Nov  5 11:36:41 2006
@@ -6,28 +6,30 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter.codec.support;
 
-import org.apache.mina.common.IoSession;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.mina.common.IoFilter.NextFilter;
+import org.apache.mina.common.IoSession;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.util.Queue;
 
 /**
  * A {@link ProtocolDecoderOutput} based on queue.
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  *
@@ -36,17 +38,17 @@
 {
     private final NextFilter nextFilter;
     private final IoSession session;
-    private final Queue messageQueue = new Queue();
-    
+    private final List<Object> messageQueue = new ArrayList<Object>();
+
     public SimpleProtocolDecoderOutput( IoSession session, NextFilter nextFilter )
     {
         this.nextFilter = nextFilter;
         this.session = session;
     }
-    
+
     public void write( Object message )
     {
-        messageQueue.push( message );
+        messageQueue.add( message );
         if( session instanceof BaseIoSession )
         {
             ( ( BaseIoSession ) session ).increaseReadMessages();
@@ -57,8 +59,8 @@
     {
         while( !messageQueue.isEmpty() )
         {
-            nextFilter.messageReceived( session, messageQueue.pop() );
+            nextFilter.messageReceived( session, messageQueue.remove(0) );
         }
-        
+
     }
 }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java Sun Nov  5 11:36:41 2006
@@ -6,23 +6,25 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.filter.codec.support;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.apache.mina.util.Queue;
 
 /**
  * A {@link ProtocolEncoderOutput} based on queue.
@@ -32,88 +34,73 @@
  */
 public abstract class SimpleProtocolEncoderOutput implements ProtocolEncoderOutput
 {
-    private final Queue bufferQueue = new Queue();
-    
+    private final List<ByteBuffer> bufferQueue = new ArrayList<ByteBuffer>();
+
     public SimpleProtocolEncoderOutput()
     {
     }
-    
-    public Queue getBufferQueue()
+
+    public List<ByteBuffer> getBufferQueue()
     {
         return bufferQueue;
     }
-    
+
     public void write( ByteBuffer buf )
     {
-        bufferQueue.push( buf );
+        bufferQueue.add( buf );
     }
-    
+
     public void mergeAll()
     {
-        int sum = 0;
         final int size = bufferQueue.size();
-        
+
         if( size < 2 )
         {
             // no need to merge!
             return;
         }
-        
+
         // Get the size of merged BB
+        int sum = 0;
         for( int i = size - 1; i >= 0; i -- )
         {
-            sum += ( ( ByteBuffer ) bufferQueue.get( i ) ).remaining();
+            sum += bufferQueue.get( i ).remaining();
         }
-        
+
         // Allocate a new BB that will contain all fragments
         ByteBuffer newBuf = ByteBuffer.allocate( sum );
-        
+
         // and merge all.
-        for( ;; )
+        for( ; !bufferQueue.isEmpty(); )
         {
-            ByteBuffer buf = ( ByteBuffer ) bufferQueue.pop();
-            if( buf == null )
-            {
-                break;
-            }
-    
+            ByteBuffer buf = bufferQueue.remove( 0 );
+
             newBuf.put( buf );
             buf.release();
         }
-        
+
         // Push the new buffer finally.
         newBuf.flip();
-        bufferQueue.push(newBuf);
+        bufferQueue.add( newBuf );
     }
-    
+
     public WriteFuture flush()
     {
-        Queue bufferQueue = this.bufferQueue;
         WriteFuture future = null;
-        if( bufferQueue.isEmpty() )
-        {
-            return null;
-        }
-        else
+
+        for( ; !bufferQueue.isEmpty(); )
         {
-            for( ;; )
+            ByteBuffer buf = bufferQueue.remove( 0 );
+
+            // Flush only when the buffer has remaining.
+            if( buf.hasRemaining() )
             {
-                ByteBuffer buf = ( ByteBuffer ) bufferQueue.pop();
-                if( buf == null )
-                {
-                    break;
-                }
-                
-                // Flush only when the buffer has remaining.
-                if( buf.hasRemaining() )
-                {
-                    future = doFlush( buf );
-                }
+                future = doFlush( buf );
             }
         }
-        
+
         return future;
     }
-    
+
     protected abstract WriteFuture doFlush( ByteBuffer buf );
 }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Nov  5 11:36:41 2006
@@ -27,21 +27,23 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.NewThreadExecutor;
 import org.apache.mina.util.NamePreservingRunnable;
-import java.util.concurrent.Executor;
+import org.apache.mina.util.NewThreadExecutor;
 
 /**
  * {@link IoAcceptor} for socket transport (TCP/IP).
@@ -51,27 +53,21 @@
  */
 public class SocketAcceptor extends BaseIoAcceptor
 {
-    /**
-     * @noinspection StaticNonFinalField
-     */
-    private static volatile int nextId = 0;
+    private static final AtomicInteger nextId = new AtomicInteger();
 
     private final Executor executor;
     private final Object lock = new Object();
-    private final int id = nextId ++;
+    private final int id = nextId.getAndIncrement();
     private final String threadName = "SocketAcceptor-" + id;
     private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
-    private final Map channels = new HashMap();
+    private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
 
-    private final Queue registerQueue = new Queue();
-    private final Queue cancelQueue = new Queue();
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+    private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
 
     private final SocketIoProcessor[] ioProcessors;
     private final int processorCount;
 
-    /**
-     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
-     */
     private Selector selector;
     private Worker worker;
     private int processorDistributor = 0;
@@ -88,7 +84,7 @@
      * Create an acceptor with the desired number of processing threads
      *
      * @param processorCount Number of processing threads
-     * @param executor Executor to use for launching threads
+     * @param executor       Executor to use for launching threads
      */
     public SocketAcceptor( int processorCount, Executor executor )
     {
@@ -132,10 +128,7 @@
 
         RegistrationRequest request = new RegistrationRequest( address, handler, config );
 
-        synchronized( registerQueue )
-        {
-            registerQueue.push( request );
-        }
+        registerQueue.add( request );
 
         startupWorker();
 
@@ -199,10 +192,7 @@
             throw new IllegalArgumentException( "Address not bound: " + address );
         }
 
-        synchronized( cancelQueue )
-        {
-            cancelQueue.push( request );
-        }
+        cancelQueue.add( request );
 
         selector.wakeup();
 
@@ -231,15 +221,11 @@
 
     public void unbindAll()
     {
-        List addresses;
-        synchronized( channels )
-        {
-            addresses = new ArrayList( channels.keySet() );
-        }
+        List<SocketAddress> addresses = new ArrayList<SocketAddress>( channels.keySet() );
 
-        for( Iterator i = addresses.iterator(); i.hasNext(); )
+        for( SocketAddress address : addresses )
         {
-            unbind( ( SocketAddress ) i.next() );
+            unbind( address );
         }
     }
 
@@ -247,7 +233,7 @@
     {
         public void run()
         {
-            Thread.currentThread().setName(SocketAcceptor.this.threadName );
+            Thread.currentThread().setName( SocketAcceptor.this.threadName );
 
             for( ; ; )
             {
@@ -306,12 +292,12 @@
             }
         }
 
-        private void processSessions( Set keys ) throws IOException
+        private void processSessions( Set<SelectionKey> keys ) throws IOException
         {
-            Iterator it = keys.iterator();
+            Iterator<SelectionKey> it = keys.iterator();
             while( it.hasNext() )
             {
-                SelectionKey key = ( SelectionKey ) it.next();
+                SelectionKey key = it.next();
 
                 it.remove();
 
@@ -334,8 +320,8 @@
                 {
                     RegistrationRequest req = ( RegistrationRequest ) key.attachment();
                     SocketSessionImpl session = new SocketSessionImpl(
-                            SocketAcceptor.this, nextProcessor(), getListeners(),
-                            req.config, ch, req.handler, req.address );
+                        SocketAcceptor.this, nextProcessor(), getListeners(),
+                        req.config, ch, req.handler, req.address );
                     getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
                     req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
                     req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
@@ -391,12 +377,7 @@
 
         for( ; ; )
         {
-            RegistrationRequest req;
-
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
+            RegistrationRequest req = registerQueue.poll();
 
             if( req == null )
             {
@@ -433,13 +414,10 @@
                 }
                 ssc.register( selector, SelectionKey.OP_ACCEPT, req );
 
-                synchronized( channels )
-                {
-                    channels.put( req.address, ssc );
-                }
+                channels.put( req.address, ssc );
 
                 getListeners().fireServiceActivated(
-                        this, req.address, req.handler, req.config );
+                    this, req.address, req.handler, req.config );
             }
             catch( IOException e )
             {
@@ -479,23 +457,14 @@
 
         for( ; ; )
         {
-            CancellationRequest request;
-
-            synchronized( cancelQueue )
-            {
-                request = ( CancellationRequest ) cancelQueue.pop();
-            }
+            CancellationRequest request = cancelQueue.poll();
 
             if( request == null )
             {
                 break;
             }
 
-            ServerSocketChannel ssc;
-            synchronized( channels )
-            {
-                ssc = ( ServerSocketChannel ) channels.remove( request.address );
-            }
+            ServerSocketChannel ssc = channels.remove( request.address );
 
             // close the channel
             try
@@ -530,9 +499,9 @@
                 if( request.exception == null )
                 {
                     getListeners().fireServiceDeactivated(
-                            this, request.address,
-                            request.registrationRequest.handler,
-                            request.registrationRequest.config );
+                        this, request.address,
+                        request.registrationRequest.handler,
+                        request.registrationRequest.config );
                 }
             }
         }

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Nov  5 11:36:41 2006
@@ -26,8 +26,11 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.Iterator;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.ExceptionMonitor;
@@ -38,10 +41,8 @@
 import org.apache.mina.common.support.AbstractIoFilterChain;
 import org.apache.mina.common.support.BaseIoConnector;
 import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.NewThreadExecutor;
 import org.apache.mina.util.NamePreservingRunnable;
-import java.util.concurrent.Executor;
+import org.apache.mina.util.NewThreadExecutor;
 
 /**
  * {@link IoConnector} for socket transport (TCP/IP).
@@ -51,23 +52,17 @@
  */
 public class SocketConnector extends BaseIoConnector
 {
-    /**
-     * @noinspection StaticNonFinalField
-     */
-    private static volatile int nextId = 0;
+    private static final AtomicInteger nextId = new AtomicInteger( );
 
     private final Object lock = new Object();
-    private final int id = nextId++;
+    private final int id = nextId.getAndIncrement();
     private final String threadName = "SocketConnector-" + id;
     private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
-    private final Queue connectQueue = new Queue();
+    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>( );
     private final SocketIoProcessor[] ioProcessors;
     private final int processorCount;
     private final Executor executor;
 
-    /**
-     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
-     */
     private Selector selector;
     private Worker worker;
     private int processorDistributor = 0;
@@ -218,10 +213,7 @@
             }
         }
 
-        synchronized( connectQueue )
-        {
-            connectQueue.push( request );
-        }
+        connectQueue.add( request );
         selector.wakeup();
 
         return request;
@@ -264,11 +256,7 @@
 
         for( ; ; )
         {
-            ConnectionRequest req;
-            synchronized( connectQueue )
-            {
-                req = ( ConnectionRequest ) connectQueue.pop();
-            }
+            ConnectionRequest req = connectQueue.poll();
 
             if( req == null )
                 break;
@@ -285,14 +273,10 @@
         }
     }
 
-    private void processSessions( Set keys )
+    private void processSessions( Set<SelectionKey> keys )
     {
-        Iterator it = keys.iterator();
-
-        while( it.hasNext() )
+        for( SelectionKey key : keys )
         {
-            SelectionKey key = ( SelectionKey ) it.next();
-
             if( !key.isConnectable() )
                 continue;
 
@@ -330,15 +314,12 @@
         keys.clear();
     }
 
-    private void processTimedOutSessions( Set keys )
+    private void processTimedOutSessions( Set<SelectionKey> keys )
     {
         long currentTime = System.currentTimeMillis();
-        Iterator it = keys.iterator();
 
-        while( it.hasNext() )
+        for( SelectionKey key : keys )
         {
-            SelectionKey key = ( SelectionKey ) it.next();
-
             if( !key.isValid() )
                 continue;
 

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Sun Nov  5 11:36:41 2006
@@ -6,31 +6,31 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio;
 
 import java.io.IOException;
+import java.util.Queue;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
 
 /**
  * An {@link IoFilterChain} for socket transport (TCP/IP).
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  */
 class SocketFilterChain extends AbstractIoFilterChain {
@@ -40,17 +40,18 @@
         super( parent );
     }
 
-    protected void doWrite( IoSession session, WriteRequest writeRequest )
+    @Override
+	protected void doWrite( IoSession session, WriteRequest writeRequest )
     {
         SocketSessionImpl s = ( SocketSessionImpl ) session;
-        Queue writeRequestQueue = s.getWriteRequestQueue();
-        
+        Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
+
         // SocketIoProcessor.doFlush() will reset it after write is finished
-        // because the buffer will be passed with messageSent event. 
+        // because the buffer will be passed with messageSent event.
         ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
         synchronized( writeRequestQueue )
         {
-            writeRequestQueue.push( writeRequest );
+			writeRequestQueue.add( writeRequest );
             if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
             {
                 // Notify SocketIoProcessor only when writeRequestQueue was empty.
@@ -59,7 +60,8 @@
         }
     }
 
-    protected void doClose( IoSession session ) throws IOException
+    @Override
+	protected void doClose( IoSession session ) throws IOException
     {
         SocketSessionImpl s = ( SocketSessionImpl ) session;
         s.getIoProcessor().remove( s );

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Nov  5 11:36:41 2006
@@ -23,8 +23,9 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.Iterator;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.ByteBuffer;
@@ -33,7 +34,6 @@
 import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
 
 /**
  * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
@@ -47,15 +47,12 @@
 
     private final String threadName;
     private final Executor executor;
-    /**
-     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
-     */
     private Selector selector;
 
-    private final Queue newSessions = new Queue();
-    private final Queue removingSessions = new Queue();
-    private final Queue flushingSessions = new Queue();
-    private final Queue trafficControllingSessions = new Queue();
+    private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+    private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
 
     private Worker worker;
     private long lastIdleCheckTime = System.currentTimeMillis();
@@ -68,10 +65,7 @@
 
     void addNew( SocketSessionImpl session ) throws IOException
     {
-        synchronized( newSessions )
-        {
-            newSessions.push( session );
-        }
+        newSessions.add( session );
 
         startupWorker();
 
@@ -120,41 +114,24 @@
 
     private void scheduleRemove( SocketSessionImpl session )
     {
-        synchronized( removingSessions )
-        {
-            removingSessions.push( session );
-        }
+        removingSessions.add( session );
     }
 
     private void scheduleFlush( SocketSessionImpl session )
     {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
+        flushingSessions.add( session );
     }
 
     private void scheduleTrafficControl( SocketSessionImpl session )
     {
-        synchronized( trafficControllingSessions )
-        {
-            trafficControllingSessions.push( session );
-        }
+        trafficControllingSessions.add( session );
     }
 
     private void doAddNew()
     {
-        if( newSessions.isEmpty() )
-            return;
-
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( newSessions )
-            {
-                session = ( SocketSessionImpl ) newSessions.pop();
-            }
+            SocketSessionImpl session = newSessions.poll();
 
             if( session == null )
                 break;
@@ -164,8 +141,8 @@
             {
                 ch.configureBlocking( false );
                 session.setSelectionKey( ch.register( selector,
-                                                      SelectionKey.OP_READ,
-                                                      session ) );
+                    SelectionKey.OP_READ,
+                    session ) );
 
                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
                 // in AbstractIoFilterChain.fireSessionOpened().
@@ -182,17 +159,9 @@
 
     private void doRemove()
     {
-        if( removingSessions.isEmpty() )
-            return;
-
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( removingSessions )
-            {
-                session = ( SocketSessionImpl ) removingSessions.pop();
-            }
+            SocketSessionImpl session = removingSessions.poll();
 
             if( session == null )
                 break;
@@ -229,13 +198,10 @@
         }
     }
 
-    private void process( Set selectedKeys )
+    private void process( Set<SelectionKey> selectedKeys )
     {
-        Iterator it = selectedKeys.iterator();
-
-        while( it.hasNext() )
+        for( SelectionKey key : selectedKeys )
         {
-            SelectionKey key = ( SelectionKey ) it.next();
             SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
 
             if( key.isReadable() && session.getTrafficMask().isReadable() )
@@ -308,12 +274,11 @@
         if( ( currentTime - lastIdleCheckTime ) >= 1000 )
         {
             lastIdleCheckTime = currentTime;
-            Set keys = selector.keys();
+            Set<SelectionKey> keys = selector.keys();
             if( keys != null )
             {
-                for( Iterator it = keys.iterator(); it.hasNext(); )
+                for( SelectionKey key : keys )
                 {
-                    SelectionKey key = ( SelectionKey ) it.next();
                     SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
                     notifyIdleness( session, currentTime );
                 }
@@ -371,17 +336,9 @@
 
     private void doFlush()
     {
-        if( flushingSessions.size() == 0 )
-            return;
-
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( SocketSessionImpl ) flushingSessions.pop();
-            }
+            SocketSessionImpl session = flushingSessions.poll();
 
             if( session == null )
                 break;
@@ -420,10 +377,10 @@
 
     private void releaseWriteBuffers( SocketSessionImpl session )
     {
-        Queue writeRequestQueue = session.getWriteRequestQueue();
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
 
-        while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
+        while( ( req = writeRequestQueue.poll() ) != null )
         {
             try
             {
@@ -447,16 +404,11 @@
         key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
 
         SocketChannel ch = session.getChannel();
-        Queue writeRequestQueue = session.getWriteRequestQueue();
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
         for( ; ; )
         {
-            WriteRequest req;
-
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
+            WriteRequest req = writeRequestQueue.peek();
 
             if( req == null )
                 break;
@@ -464,10 +416,7 @@
             ByteBuffer buf = ( ByteBuffer ) req.getMessage();
             if( buf.remaining() == 0 )
             {
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
+                writeRequestQueue.poll();
 
                 session.increaseWrittenMessages();
 
@@ -498,12 +447,7 @@
 
         for( ; ; )
         {
-            SocketSessionImpl session;
-
-            synchronized( trafficControllingSessions )
-            {
-                session = ( SocketSessionImpl ) trafficControllingSessions.pop();
-            }
+            SocketSessionImpl session = trafficControllingSessions.poll();
 
             if( session == null )
                 break;
@@ -526,7 +470,7 @@
             // The normal is OP_READ and, if there are write requests in the
             // session's write queue, set OP_WRITE to trigger flushing.
             int ops = SelectionKey.OP_READ;
-            Queue writeRequestQueue = session.getWriteRequestQueue();
+            Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
             synchronized( writeRequestQueue )
             {
                 if( !writeRequestQueue.isEmpty() )

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sun Nov  5 11:36:41 2006
@@ -6,16 +6,16 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *  
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License. 
- *  
+ *  under the License.
+ *
  */
 package org.apache.mina.transport.socket.nio;
 
@@ -23,7 +23,10 @@
 import java.net.SocketException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -32,11 +35,10 @@
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.common.support.BaseIoSessionConfig;
 import org.apache.mina.common.support.IoServiceListenerSupport;
-import org.apache.mina.util.Queue;
 
 /**
  * An {@link IoSession} for socket transport (TCP/IP).
@@ -52,7 +54,7 @@
     private final SocketIoProcessor ioProcessor;
     private final SocketFilterChain filterChain;
     private final SocketChannel ch;
-    private final Queue writeRequestQueue;
+    private final Queue<WriteRequest> writeRequestQueue;
     private final IoHandler handler;
     private final SocketAddress remoteAddress;
     private final SocketAddress localAddress;
@@ -77,7 +79,7 @@
         this.ioProcessor = ioProcessor;
         this.filterChain = new SocketFilterChain( this );
         this.ch = ch;
-        this.writeRequestQueue = new Queue();
+        this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
         this.handler = defaultHandler;
         this.remoteAddress = ch.socket().getRemoteSocketAddress();
         this.localAddress = ch.socket().getLocalSocketAddress();
@@ -109,7 +111,7 @@
     {
         return manager;
     }
-    
+
     public IoServiceConfig getServiceConfig()
     {
         return serviceConfig;
@@ -149,18 +151,19 @@
     {
         this.key = key;
     }
-    
+
     public IoHandler getHandler()
     {
         return handler;
     }
 
+    @Override
     protected void close0()
     {
         filterChain.fireFilterClose( this );
     }
 
-    Queue getWriteRequestQueue()
+    Queue<WriteRequest> getWriteRequestQueue()
     {
         return writeRequestQueue;
     }
@@ -173,14 +176,25 @@
         }
     }
 
+    /**
+     * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
+     * in the writeRequestQueue queue.
+     *
+     * @throws ClassCastException if an element is not a {@link ByteBuffer}
+     */
     public int getScheduledWriteBytes()
     {
-        synchronized( writeRequestQueue )
+        int byteSize = 0;
+
+        for( WriteRequest request : writeRequestQueue )
         {
-            return writeRequestQueue.byteSize();
+            byteSize += ( ( ByteBuffer ) request.getMessage() ).remaining();
         }
+
+        return byteSize;
     }
 
+    @Override
     protected void write0( WriteRequest writeRequest )
     {
         filterChain.fireFilterWrite( this, writeRequest );
@@ -206,6 +220,7 @@
         return serviceAddress;
     }
 
+    @Override
     protected void updateTrafficMask()
     {
         this.ioProcessor.updateTrafficMask( this );
@@ -216,6 +231,7 @@
         return readBufferSize;
     }
 
+    @SuppressWarnings( {"CloneableClassWithoutClone"} )
     private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
     {
         public boolean isKeepAlive()

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Sun Nov  5 11:36:41 2006
@@ -26,29 +26,31 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionRecycler;
-import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.support.BaseIoAcceptor;
 import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
 import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
-
-import java.util.concurrent.Executor;
 
 /**
  * {@link IoAcceptor} for datagram transport (UDP/IP).
@@ -58,17 +60,18 @@
  */
 public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramService
 {
-    private static volatile int nextId = 0;
+    private static final AtomicInteger nextId = new AtomicInteger();
 
+    private final Object lock = new Object();
     private final IoAcceptor wrapper;
     private final Executor executor;
-    private final int id = nextId ++ ;
+    private final int id = nextId.getAndIncrement();
     private Selector selector;
     private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
-    private final Map channels = new HashMap();
-    private final Queue registerQueue = new Queue();
-    private final Queue cancelQueue = new Queue();
-    private final Queue flushingSessions = new Queue();
+    private final Map<SocketAddress, DatagramChannel> channels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();
+    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+    private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
+    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
     private Worker worker;
 
     /**
@@ -81,7 +84,7 @@
     }
 
     public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config )
-            throws IOException
+        throws IOException
     {
         if( handler == null )
             throw new NullPointerException( "handler" );
@@ -92,17 +95,12 @@
 
         if( address != null && !( address instanceof InetSocketAddress ) )
             throw new IllegalArgumentException( "Unexpected address type: "
-                                                + address.getClass() );
+                + address.getClass() );
 
         RegistrationRequest request = new RegistrationRequest( address, handler, config );
-        synchronized( this )
-        {
-            synchronized( registerQueue )
-            {
-                registerQueue.push( request );
-            }
-            startupWorker();
-        }
+        registerQueue.add( request );
+        startupWorker();
+
         selector.wakeup();
 
         synchronized( request )
@@ -115,6 +113,7 @@
                 }
                 catch( InterruptedException e )
                 {
+                    throw new RuntimeIOException( e );
                 }
             }
         }
@@ -131,26 +130,20 @@
             throw new NullPointerException( "address" );
 
         CancellationRequest request = new CancellationRequest( address );
-        synchronized( this )
+        try
         {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                // IOException is thrown only when Worker thread is not
-                // running and failed to open a selector.  We simply throw
-                // IllegalArgumentException here because we can simply
-                // conclude that nothing is bound to the selector.
-                throw new IllegalArgumentException( "Address not bound: " + address );
-            }
-
-            synchronized( cancelQueue )
-            {
-                cancelQueue.push( request );
-            }
+            startupWorker();
         }
+        catch( IOException e )
+        {
+            // IOException is thrown only when Worker thread is not
+            // running and failed to open a selector.  We simply throw
+            // IllegalArgumentException here because we can simply
+            // conclude that nothing is bound to the selector.
+            throw new IllegalArgumentException( "Address not bound: " + address );
+        }
+
+        cancelQueue.add( request );
         selector.wakeup();
 
         synchronized( request )
@@ -163,30 +156,28 @@
                 }
                 catch( InterruptedException e )
                 {
+                    throw new RuntimeIOException( e );
                 }
             }
         }
 
         if( request.exception != null )
         {
-            throw new RuntimeException( "Failed to unbind" , request.exception );
+            throw new RuntimeException( "Failed to unbind", request.exception );
         }
     }
 
     public void unbindAll()
     {
-        List addresses;
-        synchronized( channels )
-        {
-            addresses = new ArrayList( channels.keySet() );
-        }
+        List<SocketAddress> addresses = new ArrayList<SocketAddress>( channels.keySet() );
 
-        for( Iterator i = addresses.iterator(); i.hasNext(); )
+        for( SocketAddress address : addresses )
         {
-            unbind( ( SocketAddress ) i.next() );
+            unbind( address );
         }
     }
 
+    @Override
     public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
     {
         if( remoteAddress == null )
@@ -199,7 +190,7 @@
         }
 
         Selector selector = this.selector;
-        DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
+        DatagramChannel ch = channels.get( localAddress );
         if( selector == null || ch == null )
         {
             throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
@@ -214,7 +205,7 @@
         RegistrationRequest req = ( RegistrationRequest ) key.attachment();
         IoSession session;
         IoSessionRecycler sessionRecycler = getSessionRecycler( req );
-        synchronized ( sessionRecycler )
+        synchronized( sessionRecycler )
         {
             session = sessionRecycler.recycle( localAddress, remoteAddress );
             if( session != null )
@@ -224,9 +215,9 @@
 
             // If a new session needs to be created.
             DatagramSessionImpl datagramSession = new DatagramSessionImpl(
-                    wrapper, this,
-                    req.config, ch, req.handler,
-                    req.address );
+                wrapper, this,
+                req.config, ch, req.handler,
+                req.address );
             datagramSession.setRemoteAddress( remoteAddress );
             datagramSession.setSelectionKey( key );
 
@@ -261,6 +252,7 @@
         return sessionRecycler;
     }
 
+    @Override
     public IoServiceListenerSupport getListeners()
     {
         return super.getListeners();
@@ -293,13 +285,16 @@
         this.defaultConfig = defaultConfig;
     }
 
-    private synchronized void startupWorker() throws IOException
+    private void startupWorker() throws IOException
     {
-        if( worker == null )
+        synchronized( lock )
         {
-            selector = Selector.open();
-            worker = new Worker();
-            executor.execute( new NamePreservingRunnable( worker ) );
+            if( worker == null )
+            {
+                selector = Selector.open();
+                worker = new Worker();
+                executor.execute( new NamePreservingRunnable( worker ) );
+            }
         }
     }
 
@@ -319,10 +314,7 @@
 
     private void scheduleFlush( DatagramSessionImpl session )
     {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
+        flushingSessions.add( session );
     }
 
     private class Worker implements Runnable
@@ -331,7 +323,7 @@
         {
             Thread.currentThread().setName( "DatagramAcceptor-" + id );
 
-            for( ;; )
+            for( ; ; )
             {
                 try
                 {
@@ -349,7 +341,7 @@
 
                     if( selector.keys().isEmpty() )
                     {
-                        synchronized( DatagramAcceptorDelegate.this )
+                        synchronized( lock )
                         {
                             if( selector.keys().isEmpty() &&
                                 registerQueue.isEmpty() &&
@@ -383,18 +375,19 @@
                     }
                     catch( InterruptedException e1 )
                     {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
                     }
                 }
             }
         }
     }
 
-    private void processReadySessions( Set keys )
+    private void processReadySessions( Set<SelectionKey> keys )
     {
-        Iterator it = keys.iterator();
+        Iterator<SelectionKey> it = keys.iterator();
         while( it.hasNext() )
         {
-            SelectionKey key = ( SelectionKey ) it.next();
+            SelectionKey key = it.next();
             it.remove();
 
             DatagramChannel ch = ( DatagramChannel ) key.channel();
@@ -409,10 +402,9 @@
 
                 if( key.isWritable() )
                 {
-                    for( Iterator i = getManagedSessions( req.address ).iterator();
-                         i.hasNext(); )
+                    for( Object o : getManagedSessions( req.address ) )
                     {
-                        scheduleFlush( ( DatagramSessionImpl ) i.next() );
+                        scheduleFlush( ( DatagramSessionImpl ) o );
                     }
                 }
             }
@@ -426,7 +418,7 @@
     private void readSession( DatagramChannel channel, RegistrationRequest req ) throws Exception
     {
         ByteBuffer readBuf = ByteBuffer.allocate(
-                ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() );
+            ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() );
         try
         {
             SocketAddress remoteAddress = channel.receive(
@@ -457,14 +449,9 @@
         if( flushingSessions.size() == 0 )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            DatagramSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( DatagramSessionImpl ) flushingSessions.pop();
-            }
+            DatagramSessionImpl session = flushingSessions.poll();
 
             if( session == null )
                 break;
@@ -484,15 +471,11 @@
     {
         DatagramChannel ch = session.getChannel();
 
-        Queue writeRequestQueue = session.getWriteRequestQueue();
+        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for( ;; )
+        for( ; ; )
         {
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
+            WriteRequest req = writeRequestQueue.peek();
 
             if( req == null )
                 break;
@@ -501,14 +484,11 @@
             if( buf.remaining() == 0 )
             {
                 // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
+                writeRequestQueue.poll();
 
                 session.increaseWrittenMessages();
                 buf.reset();
-                ( ( DatagramFilterChain ) session.getFilterChain() ).fireMessageSent( session, req );
+                session.getFilterChain().fireMessageSent( session, req );
                 continue;
             }
 
@@ -539,13 +519,10 @@
             else if( writtenBytes > 0 )
             {
                 key.interestOps( key.interestOps()
-                                 & ( ~SelectionKey.OP_WRITE ) );
+                    & ( ~SelectionKey.OP_WRITE ) );
 
                 // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
+                writeRequestQueue.poll();
 
                 session.increaseWrittenBytes( writtenBytes );
                 session.increaseWrittenMessages();
@@ -560,13 +537,9 @@
         if( registerQueue.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
+            RegistrationRequest req = registerQueue.poll();
 
             if( req == null )
                 break;
@@ -602,13 +575,10 @@
                     req.address = ( InetSocketAddress ) ch.socket().getLocalSocketAddress();
                 }
                 ch.register( selector, SelectionKey.OP_READ, req );
-                synchronized( channels )
-                {
-                    channels.put( req.address, ch );
-                }
+                channels.put( req.address, ch );
 
                 getListeners().fireServiceActivated(
-                        this, req.address, req.handler, req.config);
+                    this, req.address, req.handler, req.config );
             }
             catch( Throwable t )
             {
@@ -643,24 +613,16 @@
         if( cancelQueue.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
-            CancellationRequest request;
-            synchronized( cancelQueue )
-            {
-                request = ( CancellationRequest ) cancelQueue.pop();
-            }
+            CancellationRequest request = cancelQueue.poll();
 
             if( request == null )
             {
                 break;
             }
 
-            DatagramChannel ch;
-            synchronized( channels )
-            {
-                ch = ( DatagramChannel ) channels.remove( request.address );
-            }
+            DatagramChannel ch = channels.remove( request.address );
 
             // close the channel
             try
@@ -668,7 +630,7 @@
                 if( ch == null )
                 {
                     request.exception = new IllegalArgumentException(
-                            "Address not bound: " + request.address );
+                        "Address not bound: " + request.address );
                 }
                 else
                 {
@@ -695,9 +657,9 @@
                 if( request.exception == null )
                 {
                     getListeners().fireServiceDeactivated(
-                            this, request.address,
-                            request.registrationRequest.handler,
-                            request.registrationRequest.config );
+                        this, request.address,
+                        request.registrationRequest.handler,
+                        request.registrationRequest.config );
                 }
             }
         }



Mime
View raw message