directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pro...@apache.org
Subject svn commit: r471593 - /directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
Date Mon, 06 Nov 2006 00:58:18 GMT
Author: proyal
Date: Sun Nov  5 16:58:17 2006
New Revision: 471593

URL: http://svn.apache.org/viewvc?view=rev&rev=471593
Log:
Remove syncho. use an enum + switch statement for dispatching

Modified:
    directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java

Modified: directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: http://svn.apache.org/viewvc/directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?view=diff&rev=471593&r1=471592&r2=471593
==============================================================================
--- directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
(original)
+++ directory/branches/mina/1.2/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
Sun Nov  5 16:58:17 2006
@@ -19,8 +19,13 @@
  */
 package org.apache.mina.filter.executor;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoFilterAdapter;
@@ -31,22 +36,18 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A filter that forward events to {@link Executor} in
  * <a href="http://dcl.mathcs.emory.edu/util/backport-util-concurrent/">backport-util-concurrent</a>.
  * You can apply various thread model by inserting this filter to the {@link IoFilterChain}.
  * This filter is usually inserted by {@link ThreadModel} automatically, so you don't need
  * to add this filter in most cases.
- * <p>
+ * <p/>
  * Please note that this filter doesn't manage the life cycle of the underlying
  * {@link Executor}.  You have to destroy or stop it by yourself.
- *
+ * <p/>
  * <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ *
  * @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 2005) $
  */
 public class ExecutorFilter extends IoFilterAdapter
@@ -60,7 +61,7 @@
      */
     public ExecutorFilter()
     {
-        this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()
) );
+        this( new ThreadPoolExecutor( 16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()
) );
     }
 
     /**
@@ -90,18 +91,16 @@
         Event event = new Event( type, nextFilter, data );
         SessionBuffer buf = SessionBuffer.getSessionBuffer( session );
 
-        synchronized( buf.eventQueue )
+        buf.eventQueue.add( event );
+
+        if( buf.processingCompleted.compareAndSet( true, false ) )
         {
-            buf.eventQueue.add( event );
-            if( buf.processingCompleted )
+            if( logger.isDebugEnabled() )
             {
-                buf.processingCompleted = false;
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug( "Launching thread for " + session.getRemoteAddress() );
-                }
-
-                executor.execute( new ProcessEventsRunnable( buf ) );
+                logger.debug( "Launching thread for " + session.getRemoteAddress() );
             }
+
+            executor.execute( new ProcessEventsRunnable( buf ) );
         }
     }
 
@@ -124,8 +123,8 @@
         }
 
         private final IoSession session;
-        private final List eventQueue = new ArrayList();
-        private boolean processingCompleted = true;
+        private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
+        private AtomicBoolean processingCompleted = new AtomicBoolean( true );
 
         private SessionBuffer( IoSession session )
         {
@@ -133,35 +132,16 @@
         }
     }
 
-    protected static class EventType
+    protected static enum EventType
     {
-        public static final EventType OPENED = new EventType( "OPENED" );
-
-        public static final EventType CLOSED = new EventType( "CLOSED" );
-
-        public static final EventType READ = new EventType( "READ" );
-
-        public static final EventType WRITTEN = new EventType( "WRITTEN" );
-
-        public static final EventType RECEIVED = new EventType( "RECEIVED" );
-
-        public static final EventType SENT = new EventType( "SENT" );
-
-        public static final EventType IDLE = new EventType( "IDLE" );
-
-        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
-
-        private final String value;
-
-        private EventType( String value )
-        {
-            this.value = value;
-        }
-
-        public String toString()
-        {
-            return value;
-        }
+        OPENED,
+        CLOSED,
+        READ,
+        WRITTEN,
+        RECEIVED,
+        SENT,
+        IDLE,
+        EXCEPTION
     }
 
     protected static class Event
@@ -193,35 +173,41 @@
         }
     }
 
+    @Override
     public void sessionCreated( NextFilter nextFilter, IoSession session )
     {
         nextFilter.sessionCreated( session );
     }
 
+    @Override
     public void sessionOpened( NextFilter nextFilter,
                                IoSession session )
     {
         fireEvent( nextFilter, session, EventType.OPENED, null );
     }
 
+    @Override
     public void sessionClosed( NextFilter nextFilter,
                                IoSession session )
     {
         fireEvent( nextFilter, session, EventType.CLOSED, null );
     }
 
+    @Override
     public void sessionIdle( NextFilter nextFilter,
                              IoSession session, IdleStatus status )
     {
         fireEvent( nextFilter, session, EventType.IDLE, status );
     }
 
+    @Override
     public void exceptionCaught( NextFilter nextFilter,
                                  IoSession session, Throwable cause )
     {
         fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
     }
 
+    @Override
     public void messageReceived( NextFilter nextFilter,
                                  IoSession session, Object message )
     {
@@ -229,6 +215,7 @@
         fireEvent( nextFilter, session, EventType.RECEIVED, message );
     }
 
+    @Override
     public void messageSent( NextFilter nextFilter,
                              IoSession session, Object message )
     {
@@ -238,39 +225,38 @@
 
     protected void processEvent( NextFilter nextFilter, IoSession session, EventType type,
Object data )
     {
-        if( type == EventType.RECEIVED )
-        {
-            nextFilter.messageReceived( session, data );
-            ByteBufferUtil.releaseIfPossible( data );
-        }
-        else if( type == EventType.SENT )
+        switch( type )
         {
-            nextFilter.messageSent( session, data );
-            ByteBufferUtil.releaseIfPossible( data );
-        }
-        else if( type == EventType.EXCEPTION )
-        {
-            nextFilter.exceptionCaught( session, (Throwable)data );
-        }
-        else if( type == EventType.IDLE )
-        {
-            nextFilter.sessionIdle( session, (IdleStatus)data );
-        }
-        else if( type == EventType.OPENED )
-        {
-            nextFilter.sessionOpened( session );
-        }
-        else if( type == EventType.CLOSED )
-        {
-            nextFilter.sessionClosed( session );
+            case RECEIVED:
+                nextFilter.messageReceived( session, data );
+                ByteBufferUtil.releaseIfPossible( data );
+                break;
+            case SENT:
+                nextFilter.messageSent( session, data );
+                ByteBufferUtil.releaseIfPossible( data );
+                break;
+            case EXCEPTION:
+                nextFilter.exceptionCaught( session, ( Throwable ) data );
+                break;
+            case IDLE:
+                nextFilter.sessionIdle( session, ( IdleStatus ) data );
+                break;
+            case OPENED:
+                nextFilter.sessionOpened( session );
+                break;
+            case CLOSED:
+                nextFilter.sessionClosed( session );
+                break;
         }
     }
 
+    @Override
     public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest
)
     {
         nextFilter.filterWrite( session, writeRequest );
     }
 
+    @Override
     public void filterClose( NextFilter nextFilter, IoSession session ) throws Exception
     {
         nextFilter.filterClose( session );
@@ -289,23 +275,19 @@
         {
             while( true )
             {
-                Event event;
+                Event event = buffer.eventQueue.poll();
 
-                synchronized( buffer.eventQueue )
+                if( null == event )
                 {
-                    if( buffer.eventQueue.isEmpty() )
-                    {
-                        buffer.processingCompleted = true;
-                        break;
-                    }
-
-                    event = ( Event ) buffer.eventQueue.remove( 0 );
+                    buffer.processingCompleted.compareAndSet( false, true );
+                    break;
                 }
 
                 processEvent( event.getNextFilter(), buffer.session, event.getType(), event.getData()
);
             }
 
-            if ( logger.isDebugEnabled() ) {
+            if( logger.isDebugEnabled() )
+            {
                 logger.debug( "Exiting since queue is empty for " + buffer.session.getRemoteAddress()
);
             }
         }



Mime
View raw message