Author: proyal
Date: Sun Nov 5 11:36:41 2006
New Revision: 471502
URL: http://svn.apache.org/viewvc?view=rev&rev=471502
Log:
Remove mina's Queue, use java.util.Queue. Replace synchronization with a concurrent implementation.
Removed:
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/BlockingQueue.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/Queue.java
directory/branches/mina/1.2/core/src/test/java/org/apache/mina/util/QueueTest.java
Modified:
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
directory/branches/mina/1.2/core/src/main/java/org/apache/mina/util/ExpiringMap.java
directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
directory/branches/mina/1.2/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineEncoderTest.java
directory/branches/mina/1.2/example/src/main/java/org/apache/mina/example/httpserver/codec/HttpRequestDecoder.java
directory/branches/mina/1.2/filter-ssl/src/main/java/org/apache/mina/filter/support/SSLHandler.java
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/IoServiceConfig.java Sun Nov 5 11:36:41 2006
@@ -6,32 +6,32 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.common;
/**
* A configuration which is used to configure {@link IoService}.
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
-public interface IoServiceConfig
+public interface IoServiceConfig extends Cloneable
{
/**
* Resturns the default configuration of the new {@link IoSession}s.
*/
IoSessionConfig getSessionConfig();
-
+
/**
* Returns the {@link IoFilterChainBuilder} which will modify the
* {@link IoFilterChain} of all {@link IoSession}s which is created
@@ -39,7 +39,7 @@
* The default value is an empty {@link DefaultIoFilterChainBuilder}.
*/
IoFilterChainBuilder getFilterChainBuilder();
-
+
/**
* Sets the {@link IoFilterChainBuilder} which will modify the
* {@link IoFilterChain} of all {@link IoSession}s which is created
@@ -48,7 +48,7 @@
* an empty {@link DefaultIoFilterChainBuilder}.
*/
void setFilterChainBuilder( IoFilterChainBuilder builder );
-
+
/**
* A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
* Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
@@ -60,7 +60,7 @@
* not a {@link DefaultIoFilterChainBuilder}
*/
DefaultIoFilterChainBuilder getFilterChain();
-
+
/**
* Returns the default {@link ThreadModel} of the {@link IoService}.
* The default value is a {@link ExecutorThreadModel}() whose service name is
@@ -69,7 +69,7 @@
* {@link ExecutorThreadModel#getInstance(String)}.
*/
ThreadModel getThreadModel();
-
+
/**
* Sets the default {@link ThreadModel} of the {@link IoService}.
* If you specify <tt>null</tt>, this property will be set to the
@@ -80,9 +80,10 @@
* {@link ExecutorThreadModel#getInstance(String)}.
*/
void setThreadModel( ThreadModel threadModel );
-
+
/**
* Returns a deep clone of this configuration.
*/
+ @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "override"})
Object clone();
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/common/support/BaseIoServiceConfig.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.common.support;
@@ -29,7 +29,7 @@
/**
* A base implementation of {@link IoServiceConfig}.
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
@@ -39,17 +39,17 @@
* Current filter chain builder.
*/
private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
-
+
/**
* The default thread model.
*/
private final ThreadModel defaultThreadModel = ExecutorThreadModel.getInstance("AnonymousIoService");
-
+
/**
* Current thread model.
*/
private ThreadModel threadModel = defaultThreadModel;
-
+
public BaseIoServiceConfig()
{
super();
@@ -68,7 +68,7 @@
}
filterChainBuilder = builder;
}
-
+
public DefaultIoFilterChainBuilder getFilterChain()
{
if( filterChainBuilder instanceof DefaultIoFilterChainBuilder )
@@ -81,7 +81,7 @@
"Current filter chain builder is not a DefaultIoFilterChainBuilder." );
}
}
-
+
public ThreadModel getThreadModel()
{
return threadModel;
@@ -97,7 +97,9 @@
}
this.threadModel = threadModel;
}
-
+
+ @SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException"})
+ @Override
public Object clone()
{
BaseIoServiceConfig ret;
@@ -109,20 +111,21 @@
{
throw ( InternalError ) new InternalError().initCause( e );
}
-
+
// Try to clone the chain builder.
try
{
- Method cloneMethod = this.filterChainBuilder.getClass().getMethod( "clone", null );
+ Method cloneMethod = this.filterChainBuilder.getClass().getMethod( "clone" );
if( cloneMethod.isAccessible() )
{
- ret.filterChainBuilder = ( IoFilterChainBuilder ) cloneMethod.invoke( this.filterChainBuilder, null );
+ ret.filterChainBuilder = ( IoFilterChainBuilder ) cloneMethod.invoke( this.filterChainBuilder );
}
}
catch( Exception e )
{
+ // uncloneable
}
-
+
return ret;
}
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/StreamWriteFilter.java Sun Nov 5 11:36:41 2006
@@ -6,50 +6,51 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
-import org.apache.mina.util.Queue;
/**
* Filter implementation which makes it possible to write {@link InputStream}
- * objects directly using {@link IoSession#write(Object)}. When an
+ * objects directly using {@link IoSession#write(Object)}. When an
* {@link InputStream} is written to a session this filter will read the bytes
* from the stream into {@link ByteBuffer} objects and write those buffers
* to the next filter. When end of stream has been reached this filter will
* call {@link NextFilter#messageSent(IoSession, Object)} using the original
- * {@link InputStream} written to the session and notifies
- * {@link org.apache.mina.common.WriteFuture} on the
+ * {@link InputStream} written to the session and notifies
+ * {@link org.apache.mina.common.WriteFuture} on the
* original {@link org.apache.mina.common.IoFilter.WriteRequest}.
- * <p>
+ * <p/>
* This filter will ignore written messages which aren't {@link InputStream}
* instances. Such messages will be passed to the next filter directly.
* </p>
- * <p>
+ * <p/>
* NOTE: this filter does not close the stream after all data from stream
* has been written. The {@link org.apache.mina.common.IoHandler} should take
- * care of that in its
- * {@link org.apache.mina.common.IoHandler#messageSent(IoSession, Object)}
+ * care of that in its
+ * {@link org.apache.mina.common.IoHandler#messageSent(IoSession, Object)}
* callback.
* </p>
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
@@ -69,32 +70,33 @@
protected static final String INITIAL_WRITE_FUTURE = StreamWriteFilter.class.getName() + ".future";
private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
-
- public void filterWrite( NextFilter nextFilter, IoSession session,
- WriteRequest writeRequest ) throws Exception
+
+ @Override
+ public void filterWrite( NextFilter nextFilter, IoSession session,
+ WriteRequest writeRequest ) throws Exception
{
// If we're already processing a stream we need to queue the WriteRequest.
if( session.getAttribute( CURRENT_STREAM ) != null )
{
- Queue queue = ( Queue ) session.getAttribute( WRITE_REQUEST_QUEUE );
+ Queue<WriteRequest> queue = ( Queue<WriteRequest> ) session.getAttribute( WRITE_REQUEST_QUEUE );
if( queue == null )
{
- queue = new Queue();
+ queue = new ConcurrentLinkedQueue<WriteRequest>( );
session.setAttribute( WRITE_REQUEST_QUEUE, queue );
}
- queue.push( writeRequest );
+ queue.add( writeRequest );
return;
}
-
+
Object message = writeRequest.getMessage();
-
+
if( message instanceof InputStream )
{
-
+
InputStream inputStream = ( InputStream ) message;
-
+
ByteBuffer byteBuffer = getNextByteBuffer( inputStream );
- if ( byteBuffer == null )
+ if( byteBuffer == null )
{
// End of stream reached.
writeRequest.getFuture().setWritten( true );
@@ -104,7 +106,7 @@
{
session.setAttribute( CURRENT_STREAM, inputStream );
session.setAttribute( INITIAL_WRITE_FUTURE, writeRequest.getFuture() );
-
+
nextFilter.filterWrite( session, new WriteRequest( byteBuffer ) );
}
@@ -115,10 +117,11 @@
}
}
+ @Override
public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
{
InputStream inputStream = ( InputStream ) session.getAttribute( CURRENT_STREAM );
-
+
if( inputStream == null )
{
nextFilter.messageSent( session, message );
@@ -126,25 +129,25 @@
else
{
ByteBuffer byteBuffer = getNextByteBuffer( inputStream );
-
- if( byteBuffer == null )
+
+ if( byteBuffer == null )
{
// End of stream reached.
session.removeAttribute( CURRENT_STREAM );
WriteFuture writeFuture = ( WriteFuture ) session.removeAttribute( INITIAL_WRITE_FUTURE );
-
+
// Write queued WriteRequests.
- Queue queue = ( Queue ) session.removeAttribute( WRITE_REQUEST_QUEUE );
+ Queue<? extends WriteRequest> queue = ( Queue<? extends WriteRequest> ) session.removeAttribute( WRITE_REQUEST_QUEUE );
if( queue != null )
{
- WriteRequest wr = ( WriteRequest ) queue.pop();
+ WriteRequest wr = queue.poll();
while( wr != null )
{
filterWrite( nextFilter, session, wr );
- wr = ( WriteRequest ) queue.pop();
+ wr = queue.poll();
}
}
-
+
writeFuture.setWritten( true );
nextFilter.messageSent( session, inputStream );
}
@@ -155,32 +158,30 @@
}
}
- private ByteBuffer getNextByteBuffer( InputStream is ) throws IOException
+ private ByteBuffer getNextByteBuffer( InputStream is ) throws IOException
{
byte[] bytes = new byte[ writeBufferSize ];
-
+
int off = 0;
int n = 0;
- while( off < bytes.length &&
- ( n = is.read( bytes, off, bytes.length - off ) ) != -1 )
+ while( off < bytes.length &&
+ ( n = is.read( bytes, off, bytes.length - off ) ) != -1 )
{
off += n;
}
-
+
if( n == -1 && off == 0 )
{
return null;
}
- ByteBuffer buffer = ByteBuffer.wrap( bytes, 0, off );
-
- return buffer;
+ return ByteBuffer.wrap( bytes, 0, off );
}
/**
- * Returns the size of the write buffer in bytes. Data will be read from the
+ * Returns the size of the write buffer in bytes. Data will be read from the
* stream in chunks of this size and then written to the next filter.
- *
+ *
* @return the write buffer size.
*/
public int getWriteBufferSize()
@@ -189,9 +190,9 @@
}
/**
- * Sets the size of the write buffer in bytes. Data will be read from the
+ * Sets the size of the write buffer in bytes. Data will be read from the
* stream in chunks of this size and then written to the next filter.
- *
+ *
* @throws IllegalArgumentException if the specified size is < 1.
*/
public void setWriteBufferSize( int writeBufferSize )
@@ -202,6 +203,6 @@
}
this.writeBufferSize = writeBufferSize;
}
-
-
+
+
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.codec;
@@ -35,7 +35,7 @@
* An {@link IoFilter} which translates binary or protocol specific data into
* message object and vice versa using {@link ProtocolCodecFactory},
* {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
@@ -43,12 +43,12 @@
{
public static final String ENCODER = ProtocolCodecFilter.class.getName() + ".encoder";
public static final String DECODER = ProtocolCodecFilter.class.getName() + ".decoder";
-
+
private static final Class[] EMPTY_PARAMS = new Class[0];
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
private final ProtocolCodecFactory factory;
-
+
public ProtocolCodecFilter( ProtocolCodecFactory factory )
{
if( factory == null )
@@ -57,7 +57,7 @@
}
this.factory = factory;
}
-
+
public ProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
{
if( encoder == null )
@@ -68,7 +68,7 @@
{
throw new NullPointerException( "decoder" );
}
-
+
this.factory = new ProtocolCodecFactory()
{
public ProtocolEncoder getEncoder()
@@ -82,7 +82,7 @@
}
};
}
-
+
public ProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
{
if( encoderClass == null )
@@ -117,7 +117,7 @@
{
throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
}
-
+
this.factory = new ProtocolCodecFactory()
{
public ProtocolEncoder getEncoder() throws Exception
@@ -132,6 +132,7 @@
};
}
+ @Override
public void onPreAdd( IoFilterChain parent, String name, NextFilter nextFilter ) throws Exception
{
if( parent.contains( ProtocolCodecFilter.class ) )
@@ -140,6 +141,7 @@
}
}
+ @Override
public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
{
if( !( message instanceof ByteBuffer ) )
@@ -151,7 +153,7 @@
ByteBuffer in = ( ByteBuffer ) message;
ProtocolDecoder decoder = getDecoder( session );
ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
-
+
try
{
decoder.decode( session, in, decoderOut );
@@ -185,6 +187,7 @@
}
}
+ @Override
public void messageSent( NextFilter nextFilter, IoSession session, Object message ) throws Exception
{
if( message instanceof HiddenByteBuffer )
@@ -200,7 +203,8 @@
nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
}
-
+
+ @Override
public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
{
Object message = writeRequest.getMessage();
@@ -212,7 +216,7 @@
ProtocolEncoder encoder = getEncoder( session );
ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
-
+
try
{
encoder.encode( session, message, encoderOut );
@@ -245,7 +249,8 @@
}
}
}
-
+
+ @Override
public void sessionClosed( NextFilter nextFilter, IoSession session ) throws Exception
{
// Call finishDecode() first when a connection is closed.
@@ -276,7 +281,7 @@
decoderOut.flush();
}
-
+
nextFilter.sessionClosed( session );
}
@@ -290,12 +295,12 @@
}
return encoder;
}
-
+
private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, NextFilter nextFilter, WriteRequest writeRequest )
{
return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
}
-
+
private ProtocolDecoder getDecoder( IoSession session ) throws Exception
{
ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
@@ -306,12 +311,12 @@
}
return decoder;
}
-
+
private ProtocolDecoderOutput getDecoderOut( IoSession session, NextFilter nextFilter )
{
return new SimpleProtocolDecoderOutput( session, nextFilter );
}
-
+
private void disposeEncoder( IoSession session )
{
ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
@@ -361,41 +366,44 @@
super( buf );
}
}
-
+
private static class MessageByteBuffer extends ByteBufferProxy
{
private final Object message;
-
+
private MessageByteBuffer( Object message )
{
super( EMPTY_BUFFER );
this.message = message;
}
+ @Override
public void acquire()
{
// no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
}
+ @Override
public void release()
{
// no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
}
}
-
+
private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput
{
private final IoSession session;
private final NextFilter nextFilter;
private final WriteRequest writeRequest;
-
- public ProtocolEncoderOutputImpl( IoSession session, NextFilter nextFilter, WriteRequest writeRequest )
+
+ ProtocolEncoderOutputImpl( IoSession session, NextFilter nextFilter, WriteRequest writeRequest )
{
this.session = session;
this.nextFilter = nextFilter;
this.writeRequest = writeRequest;
}
+ @Override
protected WriteFuture doFlush( ByteBuffer buf )
{
WriteFuture future = new DefaultWriteFuture( session );
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolDecoderOutput.java Sun Nov 5 11:36:41 2006
@@ -6,28 +6,30 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.codec.support;
-import org.apache.mina.common.IoSession;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.mina.common.IoFilter.NextFilter;
+import org.apache.mina.common.IoSession;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.util.Queue;
/**
* A {@link ProtocolDecoderOutput} based on queue.
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*
@@ -36,17 +38,17 @@
{
private final NextFilter nextFilter;
private final IoSession session;
- private final Queue messageQueue = new Queue();
-
+ private final List<Object> messageQueue = new ArrayList<Object>();
+
public SimpleProtocolDecoderOutput( IoSession session, NextFilter nextFilter )
{
this.nextFilter = nextFilter;
this.session = session;
}
-
+
public void write( Object message )
{
- messageQueue.push( message );
+ messageQueue.add( message );
if( session instanceof BaseIoSession )
{
( ( BaseIoSession ) session ).increaseReadMessages();
@@ -57,8 +59,8 @@
{
while( !messageQueue.isEmpty() )
{
- nextFilter.messageReceived( session, messageQueue.pop() );
+ nextFilter.messageReceived( session, messageQueue.remove(0) );
}
-
+
}
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/codec/support/SimpleProtocolEncoderOutput.java Sun Nov 5 11:36:41 2006
@@ -6,23 +6,25 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.codec.support;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.apache.mina.util.Queue;
/**
* A {@link ProtocolEncoderOutput} based on queue.
@@ -32,88 +34,73 @@
*/
public abstract class SimpleProtocolEncoderOutput implements ProtocolEncoderOutput
{
- private final Queue bufferQueue = new Queue();
-
+ private final List<ByteBuffer> bufferQueue = new ArrayList<ByteBuffer>();
+
public SimpleProtocolEncoderOutput()
{
}
-
- public Queue getBufferQueue()
+
+ public List<ByteBuffer> getBufferQueue()
{
return bufferQueue;
}
-
+
public void write( ByteBuffer buf )
{
- bufferQueue.push( buf );
+ bufferQueue.add( buf );
}
-
+
public void mergeAll()
{
- int sum = 0;
final int size = bufferQueue.size();
-
+
if( size < 2 )
{
// no need to merge!
return;
}
-
+
// Get the size of merged BB
+ int sum = 0;
for( int i = size - 1; i >= 0; i -- )
{
- sum += ( ( ByteBuffer ) bufferQueue.get( i ) ).remaining();
+ sum += bufferQueue.get( i ).remaining();
}
-
+
// Allocate a new BB that will contain all fragments
ByteBuffer newBuf = ByteBuffer.allocate( sum );
-
+
// and merge all.
- for( ;; )
+ for( ; !bufferQueue.isEmpty(); )
{
- ByteBuffer buf = ( ByteBuffer ) bufferQueue.pop();
- if( buf == null )
- {
- break;
- }
-
+ ByteBuffer buf = bufferQueue.remove( 0 );
+
newBuf.put( buf );
buf.release();
}
-
+
// Push the new buffer finally.
newBuf.flip();
- bufferQueue.push(newBuf);
+ bufferQueue.add( newBuf );
}
-
+
public WriteFuture flush()
{
- Queue bufferQueue = this.bufferQueue;
WriteFuture future = null;
- if( bufferQueue.isEmpty() )
- {
- return null;
- }
- else
+
+ for( ; !bufferQueue.isEmpty(); )
{
- for( ;; )
+ ByteBuffer buf = bufferQueue.remove( 0 );
+
+ // Flush only when the buffer has remaining.
+ if( buf.hasRemaining() )
{
- ByteBuffer buf = ( ByteBuffer ) bufferQueue.pop();
- if( buf == null )
- {
- break;
- }
-
- // Flush only when the buffer has remaining.
- if( buf.hasRemaining() )
- {
- future = doFlush( buf );
- }
+ future = doFlush( buf );
}
}
-
+
return future;
}
-
+
protected abstract WriteFuture doFlush( ByteBuffer buf );
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Nov 5 11:36:41 2006
@@ -27,21 +27,23 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.NewThreadExecutor;
import org.apache.mina.util.NamePreservingRunnable;
-import java.util.concurrent.Executor;
+import org.apache.mina.util.NewThreadExecutor;
/**
* {@link IoAcceptor} for socket transport (TCP/IP).
@@ -51,27 +53,21 @@
*/
public class SocketAcceptor extends BaseIoAcceptor
{
- /**
- * @noinspection StaticNonFinalField
- */
- private static volatile int nextId = 0;
+ private static final AtomicInteger nextId = new AtomicInteger();
private final Executor executor;
private final Object lock = new Object();
- private final int id = nextId ++;
+ private final int id = nextId.getAndIncrement();
private final String threadName = "SocketAcceptor-" + id;
private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
- private final Map channels = new HashMap();
+ private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+ private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
private final SocketIoProcessor[] ioProcessors;
private final int processorCount;
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
private Selector selector;
private Worker worker;
private int processorDistributor = 0;
@@ -88,7 +84,7 @@
* Create an acceptor with the desired number of processing threads
*
* @param processorCount Number of processing threads
- * @param executor Executor to use for launching threads
+ * @param executor Executor to use for launching threads
*/
public SocketAcceptor( int processorCount, Executor executor )
{
@@ -132,10 +128,7 @@
RegistrationRequest request = new RegistrationRequest( address, handler, config );
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
+ registerQueue.add( request );
startupWorker();
@@ -199,10 +192,7 @@
throw new IllegalArgumentException( "Address not bound: " + address );
}
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
+ cancelQueue.add( request );
selector.wakeup();
@@ -231,15 +221,11 @@
public void unbindAll()
{
- List addresses;
- synchronized( channels )
- {
- addresses = new ArrayList( channels.keySet() );
- }
+ List<SocketAddress> addresses = new ArrayList<SocketAddress>( channels.keySet() );
- for( Iterator i = addresses.iterator(); i.hasNext(); )
+ for( SocketAddress address : addresses )
{
- unbind( ( SocketAddress ) i.next() );
+ unbind( address );
}
}
@@ -247,7 +233,7 @@
{
public void run()
{
- Thread.currentThread().setName(SocketAcceptor.this.threadName );
+ Thread.currentThread().setName( SocketAcceptor.this.threadName );
for( ; ; )
{
@@ -306,12 +292,12 @@
}
}
- private void processSessions( Set keys ) throws IOException
+ private void processSessions( Set<SelectionKey> keys ) throws IOException
{
- Iterator it = keys.iterator();
+ Iterator<SelectionKey> it = keys.iterator();
while( it.hasNext() )
{
- SelectionKey key = ( SelectionKey ) it.next();
+ SelectionKey key = it.next();
it.remove();
@@ -334,8 +320,8 @@
{
RegistrationRequest req = ( RegistrationRequest ) key.attachment();
SocketSessionImpl session = new SocketSessionImpl(
- SocketAcceptor.this, nextProcessor(), getListeners(),
- req.config, ch, req.handler, req.address );
+ SocketAcceptor.this, nextProcessor(), getListeners(),
+ req.config, ch, req.handler, req.address );
getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
@@ -391,12 +377,7 @@
for( ; ; )
{
- RegistrationRequest req;
-
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
+ RegistrationRequest req = registerQueue.poll();
if( req == null )
{
@@ -433,13 +414,10 @@
}
ssc.register( selector, SelectionKey.OP_ACCEPT, req );
- synchronized( channels )
- {
- channels.put( req.address, ssc );
- }
+ channels.put( req.address, ssc );
getListeners().fireServiceActivated(
- this, req.address, req.handler, req.config );
+ this, req.address, req.handler, req.config );
}
catch( IOException e )
{
@@ -479,23 +457,14 @@
for( ; ; )
{
- CancellationRequest request;
-
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
+ CancellationRequest request = cancelQueue.poll();
if( request == null )
{
break;
}
- ServerSocketChannel ssc;
- synchronized( channels )
- {
- ssc = ( ServerSocketChannel ) channels.remove( request.address );
- }
+ ServerSocketChannel ssc = channels.remove( request.address );
// close the channel
try
@@ -530,9 +499,9 @@
if( request.exception == null )
{
getListeners().fireServiceDeactivated(
- this, request.address,
- request.registrationRequest.handler,
- request.registrationRequest.config );
+ this, request.address,
+ request.registrationRequest.handler,
+ request.registrationRequest.config );
}
}
}
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Nov 5 11:36:41 2006
@@ -26,8 +26,11 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.Iterator;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
@@ -38,10 +41,8 @@
import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.common.support.BaseIoConnector;
import org.apache.mina.common.support.DefaultConnectFuture;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.NewThreadExecutor;
import org.apache.mina.util.NamePreservingRunnable;
-import java.util.concurrent.Executor;
+import org.apache.mina.util.NewThreadExecutor;
/**
* {@link IoConnector} for socket transport (TCP/IP).
@@ -51,23 +52,17 @@
*/
public class SocketConnector extends BaseIoConnector
{
- /**
- * @noinspection StaticNonFinalField
- */
- private static volatile int nextId = 0;
+ private static final AtomicInteger nextId = new AtomicInteger( );
private final Object lock = new Object();
- private final int id = nextId++;
+ private final int id = nextId.getAndIncrement();
private final String threadName = "SocketConnector-" + id;
private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
- private final Queue connectQueue = new Queue();
+ private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>( );
private final SocketIoProcessor[] ioProcessors;
private final int processorCount;
private final Executor executor;
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
private Selector selector;
private Worker worker;
private int processorDistributor = 0;
@@ -218,10 +213,7 @@
}
}
- synchronized( connectQueue )
- {
- connectQueue.push( request );
- }
+ connectQueue.add( request );
selector.wakeup();
return request;
@@ -264,11 +256,7 @@
for( ; ; )
{
- ConnectionRequest req;
- synchronized( connectQueue )
- {
- req = ( ConnectionRequest ) connectQueue.pop();
- }
+ ConnectionRequest req = connectQueue.poll();
if( req == null )
break;
@@ -285,14 +273,10 @@
}
}
- private void processSessions( Set keys )
+ private void processSessions( Set<SelectionKey> keys )
{
- Iterator it = keys.iterator();
-
- while( it.hasNext() )
+ for( SelectionKey key : keys )
{
- SelectionKey key = ( SelectionKey ) it.next();
-
if( !key.isConnectable() )
continue;
@@ -330,15 +314,12 @@
keys.clear();
}
- private void processTimedOutSessions( Set keys )
+ private void processTimedOutSessions( Set<SelectionKey> keys )
{
long currentTime = System.currentTimeMillis();
- Iterator it = keys.iterator();
- while( it.hasNext() )
+ for( SelectionKey key : keys )
{
- SelectionKey key = ( SelectionKey ) it.next();
-
if( !key.isValid() )
continue;
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Sun Nov 5 11:36:41 2006
@@ -6,31 +6,31 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio;
import java.io.IOException;
+import java.util.Queue;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
/**
* An {@link IoFilterChain} for socket transport (TCP/IP).
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
*/
class SocketFilterChain extends AbstractIoFilterChain {
@@ -40,17 +40,18 @@
super( parent );
}
- protected void doWrite( IoSession session, WriteRequest writeRequest )
+ @Override
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
{
SocketSessionImpl s = ( SocketSessionImpl ) session;
- Queue writeRequestQueue = s.getWriteRequestQueue();
-
+ Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
+
// SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
+ // because the buffer will be passed with messageSent event.
( ( ByteBuffer ) writeRequest.getMessage() ).mark();
synchronized( writeRequestQueue )
{
- writeRequestQueue.push( writeRequest );
+ writeRequestQueue.add( writeRequest );
if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
{
// Notify SocketIoProcessor only when writeRequestQueue was empty.
@@ -59,7 +60,8 @@
}
}
- protected void doClose( IoSession session ) throws IOException
+ @Override
+ protected void doClose( IoSession session ) throws IOException
{
SocketSessionImpl s = ( SocketSessionImpl ) session;
s.getIoProcessor().remove( s );
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Sun Nov 5 11:36:41 2006
@@ -23,8 +23,9 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.Iterator;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.ByteBuffer;
@@ -33,7 +34,6 @@
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.WriteTimeoutException;
import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
/**
* Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
@@ -47,15 +47,12 @@
private final String threadName;
private final Executor executor;
- /**
- * @noinspection FieldAccessedSynchronizedAndUnsynchronized
- */
private Selector selector;
- private final Queue newSessions = new Queue();
- private final Queue removingSessions = new Queue();
- private final Queue flushingSessions = new Queue();
- private final Queue trafficControllingSessions = new Queue();
+ private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+ private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
private Worker worker;
private long lastIdleCheckTime = System.currentTimeMillis();
@@ -68,10 +65,7 @@
void addNew( SocketSessionImpl session ) throws IOException
{
- synchronized( newSessions )
- {
- newSessions.push( session );
- }
+ newSessions.add( session );
startupWorker();
@@ -120,41 +114,24 @@
private void scheduleRemove( SocketSessionImpl session )
{
- synchronized( removingSessions )
- {
- removingSessions.push( session );
- }
+ removingSessions.add( session );
}
private void scheduleFlush( SocketSessionImpl session )
{
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
+ flushingSessions.add( session );
}
private void scheduleTrafficControl( SocketSessionImpl session )
{
- synchronized( trafficControllingSessions )
- {
- trafficControllingSessions.push( session );
- }
+ trafficControllingSessions.add( session );
}
private void doAddNew()
{
- if( newSessions.isEmpty() )
- return;
-
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( newSessions )
- {
- session = ( SocketSessionImpl ) newSessions.pop();
- }
+ SocketSessionImpl session = newSessions.poll();
if( session == null )
break;
@@ -164,8 +141,8 @@
{
ch.configureBlocking( false );
session.setSelectionKey( ch.register( selector,
- SelectionKey.OP_READ,
- session ) );
+ SelectionKey.OP_READ,
+ session ) );
// AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
@@ -182,17 +159,9 @@
private void doRemove()
{
- if( removingSessions.isEmpty() )
- return;
-
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( removingSessions )
- {
- session = ( SocketSessionImpl ) removingSessions.pop();
- }
+ SocketSessionImpl session = removingSessions.poll();
if( session == null )
break;
@@ -229,13 +198,10 @@
}
}
- private void process( Set selectedKeys )
+ private void process( Set<SelectionKey> selectedKeys )
{
- Iterator it = selectedKeys.iterator();
-
- while( it.hasNext() )
+ for( SelectionKey key : selectedKeys )
{
- SelectionKey key = ( SelectionKey ) it.next();
SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
if( key.isReadable() && session.getTrafficMask().isReadable() )
@@ -308,12 +274,11 @@
if( ( currentTime - lastIdleCheckTime ) >= 1000 )
{
lastIdleCheckTime = currentTime;
- Set keys = selector.keys();
+ Set<SelectionKey> keys = selector.keys();
if( keys != null )
{
- for( Iterator it = keys.iterator(); it.hasNext(); )
+ for( SelectionKey key : keys )
{
- SelectionKey key = ( SelectionKey ) it.next();
SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
notifyIdleness( session, currentTime );
}
@@ -371,17 +336,9 @@
private void doFlush()
{
- if( flushingSessions.size() == 0 )
- return;
-
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = ( SocketSessionImpl ) flushingSessions.pop();
- }
+ SocketSessionImpl session = flushingSessions.poll();
if( session == null )
break;
@@ -420,10 +377,10 @@
private void releaseWriteBuffers( SocketSessionImpl session )
{
- Queue writeRequestQueue = session.getWriteRequestQueue();
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
- while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
+ while( ( req = writeRequestQueue.poll() ) != null )
{
try
{
@@ -447,16 +404,11 @@
key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
SocketChannel ch = session.getChannel();
- Queue writeRequestQueue = session.getWriteRequestQueue();
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
for( ; ; )
{
- WriteRequest req;
-
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
+ WriteRequest req = writeRequestQueue.peek();
if( req == null )
break;
@@ -464,10 +416,7 @@
ByteBuffer buf = ( ByteBuffer ) req.getMessage();
if( buf.remaining() == 0 )
{
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
+ writeRequestQueue.poll();
session.increaseWrittenMessages();
@@ -498,12 +447,7 @@
for( ; ; )
{
- SocketSessionImpl session;
-
- synchronized( trafficControllingSessions )
- {
- session = ( SocketSessionImpl ) trafficControllingSessions.pop();
- }
+ SocketSessionImpl session = trafficControllingSessions.poll();
if( session == null )
break;
@@ -526,7 +470,7 @@
// The normal is OP_READ and, if there are write requests in the
// session's write queue, set OP_WRITE to trigger flushing.
int ops = SelectionKey.OP_READ;
- Queue writeRequestQueue = session.getWriteRequestQueue();
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
synchronized( writeRequestQueue )
{
if( !writeRequestQueue.isEmpty() )
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sun Nov 5 11:36:41 2006
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.transport.socket.nio;
@@ -23,7 +23,10 @@
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -32,11 +35,10 @@
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.common.support.BaseIoSessionConfig;
import org.apache.mina.common.support.IoServiceListenerSupport;
-import org.apache.mina.util.Queue;
/**
* An {@link IoSession} for socket transport (TCP/IP).
@@ -52,7 +54,7 @@
private final SocketIoProcessor ioProcessor;
private final SocketFilterChain filterChain;
private final SocketChannel ch;
- private final Queue writeRequestQueue;
+ private final Queue<WriteRequest> writeRequestQueue;
private final IoHandler handler;
private final SocketAddress remoteAddress;
private final SocketAddress localAddress;
@@ -77,7 +79,7 @@
this.ioProcessor = ioProcessor;
this.filterChain = new SocketFilterChain( this );
this.ch = ch;
- this.writeRequestQueue = new Queue();
+ this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
this.handler = defaultHandler;
this.remoteAddress = ch.socket().getRemoteSocketAddress();
this.localAddress = ch.socket().getLocalSocketAddress();
@@ -109,7 +111,7 @@
{
return manager;
}
-
+
public IoServiceConfig getServiceConfig()
{
return serviceConfig;
@@ -149,18 +151,19 @@
{
this.key = key;
}
-
+
public IoHandler getHandler()
{
return handler;
}
+ @Override
protected void close0()
{
filterChain.fireFilterClose( this );
}
- Queue getWriteRequestQueue()
+ Queue<WriteRequest> getWriteRequestQueue()
{
return writeRequestQueue;
}
@@ -173,14 +176,25 @@
}
}
+ /**
+ * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
+ * in the writeRequestQueue queue.
+ *
+ * @throws ClassCastException if an element is not a {@link ByteBuffer}
+ */
public int getScheduledWriteBytes()
{
- synchronized( writeRequestQueue )
+ int byteSize = 0;
+
+ for( WriteRequest request : writeRequestQueue )
{
- return writeRequestQueue.byteSize();
+ byteSize += ( ( ByteBuffer ) request.getMessage() ).remaining();
}
+
+ return byteSize;
}
+ @Override
protected void write0( WriteRequest writeRequest )
{
filterChain.fireFilterWrite( this, writeRequest );
@@ -206,6 +220,7 @@
return serviceAddress;
}
+ @Override
protected void updateTrafficMask()
{
this.ioProcessor.updateTrafficMask( this );
@@ -216,6 +231,7 @@
return readBufferSize;
}
+ @SuppressWarnings( {"CloneableClassWithoutClone"} )
private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
{
public boolean isKeepAlive()
Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=471502&r1=471501&r2=471502
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Sun Nov 5 11:36:41 2006
@@ -26,29 +26,31 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionRecycler;
-import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.support.BaseIoAcceptor;
import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.Queue;
-
-import java.util.concurrent.Executor;
/**
* {@link IoAcceptor} for datagram transport (UDP/IP).
@@ -58,17 +60,18 @@
*/
public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramService
{
- private static volatile int nextId = 0;
+ private static final AtomicInteger nextId = new AtomicInteger();
+ private final Object lock = new Object();
private final IoAcceptor wrapper;
private final Executor executor;
- private final int id = nextId ++ ;
+ private final int id = nextId.getAndIncrement();
private Selector selector;
private DatagramAcceptorConfig defaultConfig = new DatagramAcceptorConfig();
- private final Map channels = new HashMap();
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
- private final Queue flushingSessions = new Queue();
+ private final Map<SocketAddress, DatagramChannel> channels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();
+ private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
+ private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
+ private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
private Worker worker;
/**
@@ -81,7 +84,7 @@
}
public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config )
- throws IOException
+ throws IOException
{
if( handler == null )
throw new NullPointerException( "handler" );
@@ -92,17 +95,12 @@
if( address != null && !( address instanceof InetSocketAddress ) )
throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
+ + address.getClass() );
RegistrationRequest request = new RegistrationRequest( address, handler, config );
- synchronized( this )
- {
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
- startupWorker();
- }
+ registerQueue.add( request );
+ startupWorker();
+
selector.wakeup();
synchronized( request )
@@ -115,6 +113,7 @@
}
catch( InterruptedException e )
{
+ throw new RuntimeIOException( e );
}
}
}
@@ -131,26 +130,20 @@
throw new NullPointerException( "address" );
CancellationRequest request = new CancellationRequest( address );
- synchronized( this )
+ try
{
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
+ startupWorker();
}
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ cancelQueue.add( request );
selector.wakeup();
synchronized( request )
@@ -163,30 +156,28 @@
}
catch( InterruptedException e )
{
+ throw new RuntimeIOException( e );
}
}
}
if( request.exception != null )
{
- throw new RuntimeException( "Failed to unbind" , request.exception );
+ throw new RuntimeException( "Failed to unbind", request.exception );
}
}
public void unbindAll()
{
- List addresses;
- synchronized( channels )
- {
- addresses = new ArrayList( channels.keySet() );
- }
+ List<SocketAddress> addresses = new ArrayList<SocketAddress>( channels.keySet() );
- for( Iterator i = addresses.iterator(); i.hasNext(); )
+ for( SocketAddress address : addresses )
{
- unbind( ( SocketAddress ) i.next() );
+ unbind( address );
}
}
+ @Override
public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
{
if( remoteAddress == null )
@@ -199,7 +190,7 @@
}
Selector selector = this.selector;
- DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
+ DatagramChannel ch = channels.get( localAddress );
if( selector == null || ch == null )
{
throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
@@ -214,7 +205,7 @@
RegistrationRequest req = ( RegistrationRequest ) key.attachment();
IoSession session;
IoSessionRecycler sessionRecycler = getSessionRecycler( req );
- synchronized ( sessionRecycler )
+ synchronized( sessionRecycler )
{
session = sessionRecycler.recycle( localAddress, remoteAddress );
if( session != null )
@@ -224,9 +215,9 @@
// If a new session needs to be created.
DatagramSessionImpl datagramSession = new DatagramSessionImpl(
- wrapper, this,
- req.config, ch, req.handler,
- req.address );
+ wrapper, this,
+ req.config, ch, req.handler,
+ req.address );
datagramSession.setRemoteAddress( remoteAddress );
datagramSession.setSelectionKey( key );
@@ -261,6 +252,7 @@
return sessionRecycler;
}
+ @Override
public IoServiceListenerSupport getListeners()
{
return super.getListeners();
@@ -293,13 +285,16 @@
this.defaultConfig = defaultConfig;
}
- private synchronized void startupWorker() throws IOException
+ private void startupWorker() throws IOException
{
- if( worker == null )
+ synchronized( lock )
{
- selector = Selector.open();
- worker = new Worker();
- executor.execute( new NamePreservingRunnable( worker ) );
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute( new NamePreservingRunnable( worker ) );
+ }
}
}
@@ -319,10 +314,7 @@
private void scheduleFlush( DatagramSessionImpl session )
{
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
+ flushingSessions.add( session );
}
private class Worker implements Runnable
@@ -331,7 +323,7 @@
{
Thread.currentThread().setName( "DatagramAcceptor-" + id );
- for( ;; )
+ for( ; ; )
{
try
{
@@ -349,7 +341,7 @@
if( selector.keys().isEmpty() )
{
- synchronized( DatagramAcceptorDelegate.this )
+ synchronized( lock )
{
if( selector.keys().isEmpty() &&
registerQueue.isEmpty() &&
@@ -383,18 +375,19 @@
}
catch( InterruptedException e1 )
{
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
}
}
}
}
}
- private void processReadySessions( Set keys )
+ private void processReadySessions( Set<SelectionKey> keys )
{
- Iterator it = keys.iterator();
+ Iterator<SelectionKey> it = keys.iterator();
while( it.hasNext() )
{
- SelectionKey key = ( SelectionKey ) it.next();
+ SelectionKey key = it.next();
it.remove();
DatagramChannel ch = ( DatagramChannel ) key.channel();
@@ -409,10 +402,9 @@
if( key.isWritable() )
{
- for( Iterator i = getManagedSessions( req.address ).iterator();
- i.hasNext(); )
+ for( Object o : getManagedSessions( req.address ) )
{
- scheduleFlush( ( DatagramSessionImpl ) i.next() );
+ scheduleFlush( ( DatagramSessionImpl ) o );
}
}
}
@@ -426,7 +418,7 @@
private void readSession( DatagramChannel channel, RegistrationRequest req ) throws Exception
{
ByteBuffer readBuf = ByteBuffer.allocate(
- ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() );
+ ( ( DatagramSessionConfig ) req.config.getSessionConfig() ).getReceiveBufferSize() );
try
{
SocketAddress remoteAddress = channel.receive(
@@ -457,14 +449,9 @@
if( flushingSessions.size() == 0 )
return;
- for( ;; )
+ for( ; ; )
{
- DatagramSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = ( DatagramSessionImpl ) flushingSessions.pop();
- }
+ DatagramSessionImpl session = flushingSessions.poll();
if( session == null )
break;
@@ -484,15 +471,11 @@
{
DatagramChannel ch = session.getChannel();
- Queue writeRequestQueue = session.getWriteRequestQueue();
+ Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for( ;; )
+ for( ; ; )
{
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
+ WriteRequest req = writeRequestQueue.peek();
if( req == null )
break;
@@ -501,14 +484,11 @@
if( buf.remaining() == 0 )
{
// pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
+ writeRequestQueue.poll();
session.increaseWrittenMessages();
buf.reset();
- ( ( DatagramFilterChain ) session.getFilterChain() ).fireMessageSent( session, req );
+ session.getFilterChain().fireMessageSent( session, req );
continue;
}
@@ -539,13 +519,10 @@
else if( writtenBytes > 0 )
{
key.interestOps( key.interestOps()
- & ( ~SelectionKey.OP_WRITE ) );
+ & ( ~SelectionKey.OP_WRITE ) );
// pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
+ writeRequestQueue.poll();
session.increaseWrittenBytes( writtenBytes );
session.increaseWrittenMessages();
@@ -560,13 +537,9 @@
if( registerQueue.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
+ RegistrationRequest req = registerQueue.poll();
if( req == null )
break;
@@ -602,13 +575,10 @@
req.address = ( InetSocketAddress ) ch.socket().getLocalSocketAddress();
}
ch.register( selector, SelectionKey.OP_READ, req );
- synchronized( channels )
- {
- channels.put( req.address, ch );
- }
+ channels.put( req.address, ch );
getListeners().fireServiceActivated(
- this, req.address, req.handler, req.config);
+ this, req.address, req.handler, req.config );
}
catch( Throwable t )
{
@@ -643,24 +613,16 @@
if( cancelQueue.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
- CancellationRequest request;
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
+ CancellationRequest request = cancelQueue.poll();
if( request == null )
{
break;
}
- DatagramChannel ch;
- synchronized( channels )
- {
- ch = ( DatagramChannel ) channels.remove( request.address );
- }
+ DatagramChannel ch = channels.remove( request.address );
// close the channel
try
@@ -668,7 +630,7 @@
if( ch == null )
{
request.exception = new IllegalArgumentException(
- "Address not bound: " + request.address );
+ "Address not bound: " + request.address );
}
else
{
@@ -695,9 +657,9 @@
if( request.exception == null )
{
getListeners().fireServiceDeactivated(
- this, request.address,
- request.registrationRequest.handler,
- request.registrationRequest.config );
+ this, request.address,
+ request.registrationRequest.handler,
+ request.registrationRequest.config );
}
}
}
|