mina-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jvermill...@apache.org
Subject svn commit: r551087 - /mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Date Wed, 27 Jun 2007 07:44:07 GMT
Author: jvermillard
Date: Wed Jun 27 00:44:06 2007
New Revision: 551087

URL: http://svn.apache.org/viewvc?view=rev&rev=551087
Log:
code formating

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=551087&r1=551086&r2=551087
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Wed Jun 27 00:44:06 2007
@@ -46,162 +46,135 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$,
  */
-class SocketIoProcessor
-{
+class SocketIoProcessor {
     private final Object lock = new Object();
 
     private final String threadName;
+
     private final Executor executor;
+
     private final Selector selector;
 
     private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+
     private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+
     private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
+
     private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
 
     private Worker worker;
+
     private long lastIdleCheckTime = System.currentTimeMillis();
 
-    SocketIoProcessor( String threadName, Executor executor )
-    {
+    SocketIoProcessor(String threadName, Executor executor) {
         this.threadName = threadName;
         this.executor = executor;
-        try
-        {
+        try {
             this.selector = Selector.open();
-        }
-        catch( IOException e )
-        {
-            throw new RuntimeIOException( "Failed to open a selector.", e );
+        } catch (IOException e) {
+            throw new RuntimeIOException("Failed to open a selector.", e);
         }
     }
-    
+
     @Override
-    protected void finalize() throws Throwable
-    {
+    protected void finalize() throws Throwable {
         super.finalize();
-        try
-        {
+        try {
             selector.close();
-        }
-        catch( IOException e )
-        {
-            ExceptionMonitor.getInstance().exceptionCaught( e );
+        } catch (IOException e) {
+            ExceptionMonitor.getInstance().exceptionCaught(e);
         }
     }
 
-    void addNew( SocketSessionImpl session )
-    {
-        newSessions.offer( session );
+    void addNew(SocketSessionImpl session) {
+        newSessions.offer(session);
 
         startupWorker();
     }
 
-    void remove( SocketSessionImpl session )
-    {
-        scheduleRemove( session );
+    void remove(SocketSessionImpl session) {
+        scheduleRemove(session);
         startupWorker();
     }
 
-    private void startupWorker()
-    {
-        synchronized( lock )
-        {
-            if( worker == null )
-            {
+    private void startupWorker() {
+        synchronized (lock) {
+            if (worker == null) {
                 worker = new Worker();
-                executor.execute( new NamePreservingRunnable( worker ) );
+                executor.execute(new NamePreservingRunnable(worker));
             }
         }
         selector.wakeup();
     }
 
-    void flush( SocketSessionImpl session )
-    {
-        scheduleFlush( session );
+    void flush(SocketSessionImpl session) {
+        scheduleFlush(session);
         Selector selector = this.selector;
-        if( selector != null )
-        {
+        if (selector != null) {
             selector.wakeup();
         }
     }
 
-    void updateTrafficMask( SocketSessionImpl session )
-    {
-        scheduleTrafficControl( session );
+    void updateTrafficMask(SocketSessionImpl session) {
+        scheduleTrafficControl(session);
         Selector selector = this.selector;
-        if( selector != null )
-        {
+        if (selector != null) {
             selector.wakeup();
         }
     }
 
-    private void scheduleRemove( SocketSessionImpl session )
-    {
-        removingSessions.offer( session );
+    private void scheduleRemove(SocketSessionImpl session) {
+        removingSessions.offer(session);
     }
 
-    private void scheduleFlush( SocketSessionImpl session )
-    {
-        flushingSessions.offer( session );
+    private void scheduleFlush(SocketSessionImpl session) {
+        flushingSessions.offer(session);
     }
 
-    private void scheduleTrafficControl( SocketSessionImpl session )
-    {
-        trafficControllingSessions.offer( session );
+    private void scheduleTrafficControl(SocketSessionImpl session) {
+        trafficControllingSessions.offer(session);
     }
 
-    private void doAddNew()
-    {
-        for( ; ; )
-        {
+    private void doAddNew() {
+        for (;;) {
             SocketSessionImpl session = newSessions.poll();
 
-            if( session == null ) {
+            if (session == null) {
                 break;
             }
 
             SocketChannel ch = session.getChannel();
-            try
-            {
-                ch.configureBlocking( false );
-                session.setSelectionKey( ch.register( selector,
-                                                      SelectionKey.OP_READ,
-                                                      session ) );
+            try {
+                ch.configureBlocking(false);
+                session.setSelectionKey(ch.register(selector,
+                        SelectionKey.OP_READ, session));
 
                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
                 // in AbstractIoFilterChain.fireSessionOpened().
-                getServiceListeners( session ).fireSessionCreated( session );
-            }
-            catch( IOException e )
-            {
+                getServiceListeners(session).fireSessionCreated(session);
+            } catch (IOException e) {
                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
                 // and call ConnectFuture.setException().
-                session.getFilterChain().fireExceptionCaught( session, e );
+                session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
-    
-    private IoServiceListenerSupport getServiceListeners( IoSession session )
-    {
+
+    private IoServiceListenerSupport getServiceListeners(IoSession session) {
         IoService service = session.getService();
-        if( service instanceof SocketAcceptor )
-        {
-            return ( ( SocketAcceptor ) service ).getListeners();
-        }
-        else
-        {
-            return ( ( SocketConnector ) service ).getListeners();
+        if (service instanceof SocketAcceptor) {
+            return ((SocketAcceptor) service).getListeners();
+        } else {
+            return ((SocketConnector) service).getListeners();
         }
     }
 
-    private void doRemove()
-    {
-        for( ; ; )
-        {
+    private void doRemove() {
+        for (;;) {
             SocketSessionImpl session = removingSessions.poll();
 
-            if( session == null ) {
+            if (session == null) {
                 break;
             }
 
@@ -209,291 +182,237 @@
             SelectionKey key = session.getSelectionKey();
             // Retry later if session is not yet fully initialized.
             // (In case that Session.close() is called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleRemove( session );
+            if (key == null) {
+                scheduleRemove(session);
                 break;
             }
             // skip if channel is already closed
-            if( !key.isValid() )
-            {
+            if (!key.isValid()) {
                 continue;
             }
 
-            try
-            {
+            try {
                 key.cancel();
                 ch.close();
-            }
-            catch( IOException e )
-            {
-                session.getFilterChain().fireExceptionCaught( session, e );
-            }
-            finally
-            {
-                clearWriteRequestQueue( session );
-                getServiceListeners( session ).fireSessionDestroyed( session );
+            } catch (IOException e) {
+                session.getFilterChain().fireExceptionCaught(session, e);
+            } finally {
+                clearWriteRequestQueue(session);
+                getServiceListeners(session).fireSessionDestroyed(session);
             }
         }
     }
 
-    private void process( Set<SelectionKey> selectedKeys )
-    {
+    private void process(Set<SelectionKey> selectedKeys) {
         Iterator<SelectionKey> it = selectedKeys.iterator();
 
-        while( it.hasNext() )
-        {
+        while (it.hasNext()) {
             SelectionKey key = it.next();
-            SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+            SocketSessionImpl session = (SocketSessionImpl) key.attachment();
 
-            if( key.isReadable() && session.getTrafficMask().isReadable() )
-            {
-                read( session );
+            if (key.isReadable() && session.getTrafficMask().isReadable()) {
+                read(session);
             }
 
-            if( key.isWritable() && session.getTrafficMask().isWritable() )
-            {
-                scheduleFlush( session );
+            if (key.isWritable() && session.getTrafficMask().isWritable()) {
+                scheduleFlush(session);
             }
         }
 
         selectedKeys.clear();
     }
 
-    private void read( SocketSessionImpl session )
-    {
-        ByteBuffer buf = ByteBuffer.allocate( session.getReadBufferSize() );
+    private void read(SocketSessionImpl session) {
+        ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
         SocketChannel ch = session.getChannel();
 
-        try
-        {
+        try {
             buf.clear();
 
             int readBytes = 0;
             int ret;
 
-            try
-            {
-                while( ( ret = ch.read( buf.buf() ) ) > 0 )
-                {
+            try {
+                while ((ret = ch.read(buf.buf())) > 0) {
                     readBytes += ret;
                 }
-            }
-            finally
-            {
+            } finally {
                 buf.flip();
             }
 
-            session.increaseReadBytes( readBytes );
+            session.increaseReadBytes(readBytes);
 
-            if( readBytes > 0 )
-            {
-                session.getFilterChain().fireMessageReceived( session, buf );
+            if (readBytes > 0) {
+                session.getFilterChain().fireMessageReceived(session, buf);
                 buf = null;
             }
-            if( ret < 0 )
-            {
-                scheduleRemove( session );
+            if (ret < 0) {
+                scheduleRemove(session);
             }
-        }
-        catch( Throwable e )
-        {
-            if( e instanceof IOException ) {
-                scheduleRemove( session );
+        } catch (Throwable e) {
+            if (e instanceof IOException) {
+                scheduleRemove(session);
             }
-            session.getFilterChain().fireExceptionCaught( session, e );
+            session.getFilterChain().fireExceptionCaught(session, e);
         }
     }
 
-    private void notifyIdleness()
-    {
+    private void notifyIdleness() {
         // process idle sessions
         long currentTime = System.currentTimeMillis();
-        if( ( currentTime - lastIdleCheckTime ) >= 1000 )
-        {
+        if ((currentTime - lastIdleCheckTime) >= 1000) {
             lastIdleCheckTime = currentTime;
             Set<SelectionKey> keys = selector.keys();
-            if( keys != null )
-            {
+            if (keys != null) {
                 for (SelectionKey key : keys) {
-                    SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
-                    notifyIdleness( session, currentTime );
+                    SocketSessionImpl session = (SocketSessionImpl) key
+                            .attachment();
+                    notifyIdleness(session, currentTime);
                 }
             }
         }
     }
 
-    private void notifyIdleness( SocketSessionImpl session, long currentTime )
-    {
-        notifyIdleness0(
-            session, currentTime,
-            session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
-            IdleStatus.BOTH_IDLE,
-            Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE
) ) );
-        notifyIdleness0(
-            session, currentTime,
-            session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
-            IdleStatus.READER_IDLE,
-            Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE
) ) );
-        notifyIdleness0(
-            session, currentTime,
-            session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
-            IdleStatus.WRITER_IDLE,
-            Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE
) ) );
-
-        notifyWriteTimeout( session, currentTime, session
-            .getWriteTimeoutInMillis(), session.getLastWriteTime() );
-    }
-
-    private void notifyIdleness0( SocketSessionImpl session, long currentTime,
-                                  long idleTime, IdleStatus status,
-                                  long lastIoTime )
-    {
-        if( idleTime > 0 && lastIoTime != 0
-            && ( currentTime - lastIoTime ) >= idleTime )
-        {
-            session.increaseIdleCount( status );
-            session.getFilterChain().fireSessionIdle( session, status );
+    private void notifyIdleness(SocketSessionImpl session, long currentTime) {
+        notifyIdleness0(session, currentTime, session
+                .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
+                        .getLastIdleTime(IdleStatus.BOTH_IDLE)));
+        notifyIdleness0(session, currentTime, session
+                .getIdleTimeInMillis(IdleStatus.READER_IDLE),
+                IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
+                        session.getLastIdleTime(IdleStatus.READER_IDLE)));
+        notifyIdleness0(session, currentTime, session
+                .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+                IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
+                        session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
+
+        notifyWriteTimeout(session, currentTime, session
+                .getWriteTimeoutInMillis(), session.getLastWriteTime());
+    }
+
+    private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+            long idleTime, IdleStatus status, long lastIoTime) {
+        if (idleTime > 0 && lastIoTime != 0
+                && (currentTime - lastIoTime) >= idleTime) {
+            session.increaseIdleCount(status);
+            session.getFilterChain().fireSessionIdle(session, status);
         }
     }
 
-    private void notifyWriteTimeout( SocketSessionImpl session,
-                                     long currentTime,
-                                     long writeTimeout, long lastIoTime )
-    {
+    private void notifyWriteTimeout(SocketSessionImpl session,
+            long currentTime, long writeTimeout, long lastIoTime) {
         SelectionKey key = session.getSelectionKey();
-        if( writeTimeout > 0
-            && ( currentTime - lastIoTime ) >= writeTimeout
-            && key != null && key.isValid()
-            && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
-        {
-            session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException()
);
+        if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
+                && key != null && key.isValid()
+                && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
+            session.getFilterChain().fireExceptionCaught(session,
+                    new WriteTimeoutException());
         }
     }
 
-    private void doFlush()
-    {
-        if( flushingSessions.size() == 0 ) {
+    private void doFlush() {
+        if (flushingSessions.size() == 0) {
             return;
         }
 
-        for( ; ; )
-        {
+        for (;;) {
             SocketSessionImpl session = flushingSessions.poll();
 
-            if( session == null ) {
+            if (session == null) {
                 break;
             }
 
-            if( !session.isConnected() )
-            {
-                clearWriteRequestQueue( session );
+            if (!session.isConnected()) {
+                clearWriteRequestQueue(session);
                 continue;
             }
 
-            
             SelectionKey key = session.getSelectionKey();
             // Retry later if session is not yet fully initialized.
             // (In case that Session.write() is called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleFlush( session );
+            if (key == null) {
+                scheduleFlush(session);
                 break;
             }
 
             // Skip if the channel is already closed.
-            if( !key.isValid() )
-            {
+            if (!key.isValid()) {
                 continue;
             }
 
-            try
-            {
-                doFlush( session );
-            }
-            catch( IOException e )
-            {
-                scheduleRemove( session );
-                session.getFilterChain().fireExceptionCaught( session, e );
+            try {
+                doFlush(session);
+            } catch (IOException e) {
+                scheduleRemove(session);
+                session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void clearWriteRequestQueue( SocketSessionImpl session )
-    {
+    private void clearWriteRequestQueue(SocketSessionImpl session) {
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
 
-        while( ( req = writeRequestQueue.poll() ) != null )
-        {
-            req.getFuture().setWritten( false );
+        while ((req = writeRequestQueue.poll()) != null) {
+            req.getFuture().setWritten(false);
         }
     }
 
-    private void doFlush( SocketSessionImpl session ) throws IOException
-    {
+    private void doFlush(SocketSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
         SocketChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        for( ; ; )
-        {
+        for (;;) {
             WriteRequest req;
 
-            synchronized( writeRequestQueue )
-            {
+            synchronized (writeRequestQueue) {
                 req = writeRequestQueue.peek();
             }
 
-            if( req == null ) {
+            if (req == null) {
                 break;
             }
 
-            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
-            if( buf.remaining() == 0 )
-            {
-                synchronized( writeRequestQueue )
-                {
+            ByteBuffer buf = (ByteBuffer) req.getMessage();
+            if (buf.remaining() == 0) {
+                synchronized (writeRequestQueue) {
                     writeRequestQueue.poll();
                 }
 
                 session.increaseWrittenMessages();
 
                 buf.reset();
-                session.getFilterChain().fireMessageSent( session, req );
+                session.getFilterChain().fireMessageSent(session, req);
                 continue;
             }
 
-            if( key.isWritable() )
-            {
-                int writtenBytes = ch.write( buf.buf() );
-                if( writtenBytes > 0 )
-                {
-                    session.increaseWrittenBytes( writtenBytes );
+            if (key.isWritable()) {
+                int writtenBytes = ch.write(buf.buf());
+                if (writtenBytes > 0) {
+                    session.increaseWrittenBytes(writtenBytes);
                 }
             }
 
-            if( buf.hasRemaining() )
-            {
+            if (buf.hasRemaining()) {
                 // Kernel buffer is full
-                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                 break;
             }
         }
     }
 
-    private void doUpdateTrafficMask()
-    {
-        for( ; ; )
-        {
+    private void doUpdateTrafficMask() {
+        for (;;) {
             SocketSessionImpl session;
 
             session = trafficControllingSessions.poll();
 
-            if( session == null ) {
+            if (session == null) {
                 break;
             }
 
@@ -501,82 +420,66 @@
             // Retry later if session is not yet fully initialized.
             // (In case that Session.suspend??() or session.resume??() is 
             // called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleTrafficControl( session );
+            if (key == null) {
+                scheduleTrafficControl(session);
                 break;
             }
             // skip if channel is already closed
-            if( !key.isValid() )
-            {
+            if (!key.isValid()) {
                 continue;
             }
 
             // The normal is OP_READ and, if there are write requests in the
             // session's write queue, set OP_WRITE to trigger flushing.
             int ops = SelectionKey.OP_READ;
-            Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-            synchronized( writeRequestQueue )
-            {
-                if( !writeRequestQueue.isEmpty() )
-                {
+            Queue<WriteRequest> writeRequestQueue = session
+                    .getWriteRequestQueue();
+            synchronized (writeRequestQueue) {
+                if (!writeRequestQueue.isEmpty()) {
                     ops |= SelectionKey.OP_WRITE;
                 }
             }
 
             // Now mask the preferred ops with the mask of the current session
             int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps( ops & mask );
+            key.interestOps(ops & mask);
         }
     }
 
+    private class Worker implements Runnable {
+        public void run() {
+            Thread.currentThread().setName(SocketIoProcessor.this.threadName);
 
-    private class Worker implements Runnable
-    {
-        public void run()
-        {
-            Thread.currentThread().setName( SocketIoProcessor.this.threadName );
-
-            for( ; ; )
-            {
-                try
-                {
-                    int nKeys = selector.select( 1000 );
+            for (;;) {
+                try {
+                    int nKeys = selector.select(1000);
                     doAddNew();
                     doUpdateTrafficMask();
 
-                    if( nKeys > 0 )
-                    {
-                        process( selector.selectedKeys() );
+                    if (nKeys > 0) {
+                        process(selector.selectedKeys());
                     }
 
                     doFlush();
                     doRemove();
                     notifyIdleness();
 
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( lock )
-                        {
-                            if( selector.keys().isEmpty() && newSessions.isEmpty()
)
-                            {
+                    if (selector.keys().isEmpty()) {
+                        synchronized (lock) {
+                            if (selector.keys().isEmpty()
+                                    && newSessions.isEmpty()) {
                                 worker = null;
                                 break;
                             }
                         }
                     }
-                }
-                catch( Throwable t )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( t );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
+                } catch (Throwable t) {
+                    ExceptionMonitor.getInstance().exceptionCaught(t);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                     }
                 }
             }



Mime
View raw message