mina-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r1069498 - in /mina/branches/2.0.3: mina-core/src/main/java/org/apache/mina/core/polling/ mina-core/src/main/java/org/apache/mina/transport/socket/nio/ mina-core/src/test/java/org/apache/mina/transport/socket/nio/ mina-transport-apr/src/mai...
Date Thu, 10 Feb 2011 17:53:29 GMT
Author: elecharny
Date: Thu Feb 10 17:53:29 2011
New Revision: 1069498

URL: http://svn.apache.org/viewvc?rev=1069498&view=rev
Log:
Re-injected the code which check that the e-poll issue is not met again.

Modified:
    mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java
    mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
    mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
    mina/branches/2.0.3/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
    mina/branches/2.0.3/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java

Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java?rev=1069498&r1=1069497&r2=1069498&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java
(original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java
Thu Feb 10 17:53:29 2011
@@ -55,7 +55,7 @@ import org.apache.mina.util.ExceptionMon
  *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  * @org.apache.xbean.XBean
- * 
+ *
   * @param <S> the type of the {@link IoSession} this processor can handle
 */
 public abstract class AbstractPollingConnectionlessIoAcceptor<S extends AbstractIoSession,
H>
@@ -86,32 +86,36 @@ public abstract class AbstractPollingCon
     private final ServiceOperationFuture disposalFuture =
         new ServiceOperationFuture();
     private volatile boolean selectable;
-    
-    /** The thread responsible of accepting incoming requests */ 
+
+    /** The thread responsible of accepting incoming requests */
     private Acceptor acceptor;
 
     private long lastIdleCheckTime;
-    
+
     private String getAddressAsString(SocketAddress address) {
-    	InetAddress inetAddress = ((InetSocketAddress)address).getAddress();
-    	int port = ((InetSocketAddress)address).getPort();
-    	
-    	String result = null;
-    	
-    	if ( inetAddress instanceof Inet4Address ) {
-    		result = "/" + inetAddress.getHostAddress() + ":" + port;
-    	} else {
-    		// Inet6
-    		if ( ((Inet6Address)inetAddress).isIPv4CompatibleAddress() ) {
-    			byte[] bytes = inetAddress.getAddress();
-    			
-    			result = "/" + bytes[12] + "." + bytes[13] + "." + bytes[14] + "." + bytes[15] + ":"
+ port;
-    		} else {
-    			result = inetAddress.toString();
-    		}
-    	}
-    	
-    	return result;
+        InetAddress inetAddress = ((InetSocketAddress)address).getAddress();
+        int port = ((InetSocketAddress)address).getPort();
+
+        if (inetAddress == null) {
+            return "null";
+        }
+
+        String result = null;
+
+        if ( inetAddress instanceof Inet4Address ) {
+            result = "/" + inetAddress.getHostAddress() + ":" + port;
+        } else {
+            // Inet6
+            if ( ((Inet6Address)inetAddress).isIPv4CompatibleAddress() ) {
+                byte[] bytes = inetAddress.getAddress();
+
+                result = "/" + bytes[12] + "." + bytes[13] + "." + bytes[14] + "." + bytes[15]
+ ":" + port;
+            } else {
+                result = inetAddress.toString();
+            }
+        }
+
+        return result;
     }
 
     /**
@@ -191,9 +195,9 @@ public abstract class AbstractPollingCon
         // creates the Acceptor instance and has the local
         // executor kick it off.
         startupAcceptor();
-        
+
         // As we just started the acceptor, we have to unblock the select()
-        // in order to process the bind request we just have added to the 
+        // in order to process the bind request we just have added to the
         // registerQueue.
         wakeup();
 
@@ -212,7 +216,7 @@ public abstract class AbstractPollingCon
         for (H handle : boundHandles.values()) {
             newLocalAddresses.add(localAddress(handle));
         }
-        
+
         return newLocalAddresses;
     }
 
@@ -267,17 +271,17 @@ public abstract class AbstractPollingCon
     private IoSession newSessionWithoutLock(
             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
         H handle = boundHandles.get(getAddressAsString(localAddress));
-        
+
         if (handle == null) {
             throw new IllegalArgumentException("Unknown local address: " + localAddress);
         }
 
         IoSession session;
         IoSessionRecycler sessionRecycler = getSessionRecycler();
-        
+
         synchronized (sessionRecycler) {
             session = sessionRecycler.recycle(localAddress, remoteAddress);
-            
+
             if (session != null) {
                 return session;
             }
@@ -314,7 +318,7 @@ public abstract class AbstractPollingCon
             if (sessionRecycler == null) {
                 sessionRecycler = DEFAULT_RECYCLER;
             }
-            
+
             this.sessionRecycler = sessionRecycler;
         }
     }
@@ -382,9 +386,9 @@ public abstract class AbstractPollingCon
     }
 
     /**
-     * This private class is used to accept incoming connection from 
+     * This private class is used to accept incoming connection from
      * clients. It's an infinite loop, which can be stopped when all
-     * the registered handles have been removed (unbound). 
+     * the registered handles have been removed (unbound).
      */
     private class Acceptor implements Runnable {
         public void run() {
@@ -446,7 +450,7 @@ public abstract class AbstractPollingCon
         while (handles.hasNext()) {
             H h = handles.next();
             handles.remove();
-            
+
             try {
                 if (isReadable(h)) {
                     readHandle(h);
@@ -468,7 +472,7 @@ public abstract class AbstractPollingCon
                 getSessionConfig().getReadBufferSize());
 
         SocketAddress remoteAddress = receive(handle, readBuf);
-        
+
         if (remoteAddress != null) {
             IoSession session = newSessionWithoutLock(
                     remoteAddress, localAddress(handle));
@@ -486,7 +490,7 @@ public abstract class AbstractPollingCon
     private void flushSessions(long currentTime) {
         for (;;) {
             S session = flushingSessions.poll();
-            
+
             if (session == null) {
                 break;
             }
@@ -517,11 +521,11 @@ public abstract class AbstractPollingCon
             (session.getConfig().getMaxReadBufferSize() >>> 1);
 
         int writtenBytes = 0;
-        
+
         try {
             for (;;) {
                 WriteRequest req = session.getCurrentWriteRequest();
-                
+
                 if (req == null) {
                     req = writeRequestQueue.poll(session);
                     if (req == null) {
@@ -531,7 +535,7 @@ public abstract class AbstractPollingCon
                 }
 
                 IoBuffer buf = (IoBuffer) req.getMessage();
-                
+
                 if (buf.remaining() == 0) {
                     // Clear and fire event
                     session.setCurrentWriteRequest(null);
@@ -541,14 +545,14 @@ public abstract class AbstractPollingCon
                 }
 
                 SocketAddress destination = req.getDestination();
-                
+
                 if (destination == null) {
                     destination = session.getRemoteAddress();
                 }
 
                 int localWrittenBytes = send(session, buf, destination);
-                
-                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+
+                if (( localWrittenBytes == 0 ) || ( writtenBytes >= maxWrittenBytes ))
{
                     // Kernel buffer is full or wrote too much
                     setInterestedInWrite(session, true);
                     return false;
@@ -572,25 +576,25 @@ public abstract class AbstractPollingCon
     private int registerHandles() {
         for (;;) {
             AcceptorOperationFuture req = registerQueue.poll();
-            
+
             if (req == null) {
                 break;
             }
 
             Map<String, H> newHandles = new HashMap<String, H>();
             List<SocketAddress> localAddresses = req.getLocalAddresses();
-            
+
             try {
                 for (SocketAddress socketAddress : localAddresses) {
                     H handle = open(socketAddress);
                     newHandles.put(getAddressAsString(localAddress(handle)), handle);
                 }
-                
+
                 boundHandles.putAll(newHandles);
 
                 getListeners().fireServiceActivated();
                 req.setDone();
-                
+
                 return newHandles.size();
             } catch (Exception e) {
                 req.setException(e);
@@ -604,7 +608,7 @@ public abstract class AbstractPollingCon
                             ExceptionMonitor.getInstance().exceptionCaught(e);
                         }
                     }
-                    
+
                     wakeup();
                 }
             }
@@ -615,7 +619,7 @@ public abstract class AbstractPollingCon
 
     private int unregisterHandles() {
         int nHandles = 0;
-        
+
         for (;;) {
             AcceptorOperationFuture request = cancelQueue.poll();
             if (request == null) {
@@ -625,7 +629,7 @@ public abstract class AbstractPollingCon
             // close the channels
             for (SocketAddress socketAddress : request.getLocalAddresses()) {
                 H handle = boundHandles.remove(getAddressAsString(socketAddress));
-                
+
                 if (handle == null) {
                     continue;
                 }

Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=1069498&r1=1069497&r2=1069498&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
(original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
Thu Feb 10 17:53:29 2011
@@ -59,9 +59,9 @@ import org.slf4j.LoggerFactory;
  * developers to write an {@link IoProcessor} easily. This class is in charge of
  * active polling a set of {@link IoSession} and trigger events when some I/O
  * operation is possible.
- * 
+ *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
- * 
+ *
  * @param <S> the type of the {@link IoSession} this processor can handle
  */
 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements
IoProcessor<S> {
@@ -127,7 +127,7 @@ public abstract class AbstractPollingIoP
     /**
      * Create an {@link AbstractPollingIoProcessor} with the given
      * {@link Executor} for handling I/Os events.
-     * 
+     *
      * @param executor
      *            the {@link Executor} for handling I/O events
      */
@@ -144,7 +144,7 @@ public abstract class AbstractPollingIoP
      * Compute the thread ID for this class instance. As we may have different
      * classes, we store the last ID number into a Map associating the class
      * name to the last assigned ID.
-     * 
+     *
      * @return a name for the current thread, based on the class name and an
      *         incremental value, starting at 1.
      */
@@ -209,14 +209,14 @@ public abstract class AbstractPollingIoP
     /**
      * Dispose the resources used by this {@link IoProcessor} for polling the
      * client connections. The implementing class doDispose method will be called.
-     * 
+     *
      * @throws Exception if some low level IO error occurs
      */
     protected abstract void doDispose() throws Exception;
 
     /**
      * poll those sessions for the given timeout
-     * 
+     *
      * @param timeout
      *            milliseconds before the call timeout if no event appear
      * @return The number of session ready for read or for write
@@ -227,7 +227,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * poll those sessions forever
-     * 
+     *
      * @return The number of session ready for read or for write
      * @throws Exception
      *             if some low level IO error occurs
@@ -237,7 +237,7 @@ public abstract class AbstractPollingIoP
     /**
      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
      * is empty
-     * 
+     *
      * @return true if at least a session is managed by this {@link IoProcessor}
      */
     protected abstract boolean isSelectorEmpty();
@@ -250,13 +250,13 @@ public abstract class AbstractPollingIoP
     /**
      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
      * {@link IoProcessor}
-     * 
+     *
      * @return {@link Iterator} of {@link IoSession}
      */
     protected abstract Iterator<S> allSessions();
 
     /**
-     * Get an {@link Iterator} for the list of {@link IoSession} found selected 
+     * Get an {@link Iterator} for the list of {@link IoSession} found selected
      * by the last call of {@link AbstractPollingIoProcessor#select(int)
      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
      */
@@ -264,7 +264,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * Get the state of a session (preparing, open, closed)
-     * 
+     *
      * @param session
      *            the {@link IoSession} to inspect
      * @return the state of the session
@@ -273,7 +273,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * Is the session ready for writing
-     * 
+     *
      * @param session
      *            the session queried
      * @return true is ready, false if not ready
@@ -282,7 +282,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * Is the session ready for reading
-     * 
+     *
      * @param session
      *            the session queried
      * @return true is ready, false if not ready
@@ -291,7 +291,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * register a session for writing
-     * 
+     *
      * @param session
      *            the session registered
      * @param isInterested
@@ -302,7 +302,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * register a session for reading
-     * 
+     *
      * @param session
      *            the session registered
      * @param isInterested
@@ -313,7 +313,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * is this session registered for reading
-     * 
+     *
      * @param session
      *            the session queried
      * @return true is registered for reading
@@ -322,7 +322,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * is this session registered for writing
-     * 
+     *
      * @param session
      *            the session queried
      * @return true is registered for writing
@@ -331,7 +331,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * Initialize the polling of a session. Add it to the polling process.
-     * 
+     *
      * @param session the {@link IoSession} to add to the polling
      * @throws Exception any exception thrown by the underlying system calls
      */
@@ -339,7 +339,7 @@ public abstract class AbstractPollingIoP
 
     /**
      * Destroy the underlying client socket handle
-     * 
+     *
      * @param session
      *            the {@link IoSession}
      * @throws Exception
@@ -350,7 +350,7 @@ public abstract class AbstractPollingIoP
     /**
      * Reads a sequence of bytes from a {@link IoSession} into the given
      * {@link IoBuffer}. Is called when the session was found ready for reading.
-     * 
+     *
      * @param session
      *            the session to read
      * @param buf
@@ -364,7 +364,7 @@ public abstract class AbstractPollingIoP
     /**
      * Write a sequence of bytes to a {@link IoSession}, means to be called when
      * a session was found ready for writing.
-     * 
+     *
      * @param session
      *            the session to write
      * @param buf
@@ -384,7 +384,7 @@ public abstract class AbstractPollingIoP
      * isn't supporting system calls like sendfile(), you can throw a
      * {@link UnsupportedOperationException} so the file will be send using
      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
-     * 
+     *
      * @param session
      *            the session to write
      * @param region
@@ -469,9 +469,30 @@ public abstract class AbstractPollingIoP
     }
 
     /**
+     * In the case we are using the java select() method, this method is used to
+     * trash the buggy selector and create a new one, registring all the sockets
+     * on it.
+     *
+     * @throws IOException
+     *             If we got an exception
+     */
+    abstract protected void registerNewSelector() throws IOException;
+
+    /**
+     * Check that the select() has not exited immediately just because of a
+     * broken connection. In this case, this is a standard case, and we just
+     * have to loop.
+     *
+     * @return true if a connection has been brutally closed.
+     * @throws IOException
+     *             If we got an exception
+     */
+    abstract protected boolean isBrokenConnection() throws IOException;
+
+    /**
      * Loops over the new sessions blocking queue and returns the number of
      * sessions which are effectively created
-     * 
+     *
      * @return The number of new sessions
      */
     private int handleNewSessions() {
@@ -514,7 +535,7 @@ public abstract class AbstractPollingIoP
             listeners.fireSessionCreated(session);
         } catch (Throwable e) {
             ExceptionMonitor.getInstance().exceptionCaught(e);
-            
+
             try {
                 destroy(session);
             } catch (Exception e1) {
@@ -523,7 +544,7 @@ public abstract class AbstractPollingIoP
                 registered = false;
             }
         }
-        
+
         return registered;
     }
 
@@ -540,29 +561,29 @@ public abstract class AbstractPollingIoP
                     if (removeNow(session)) {
                         removedSessions++;
                     }
-    
+
                     break;
-    
+
                 case CLOSING:
                     // Skip if channel is already closed
                     break;
-    
+
                 case OPENING:
                     // Remove session from the newSessions queue and
                     // remove it
                     newSessions.remove(session);
-    
+
                     if (removeNow(session)) {
                         removedSessions++;
                     }
-                    
+
                     break;
-    
+
                 default:
                     throw new IllegalStateException(String.valueOf(state));
             }
         }
-        
+
         return removedSessions;
     }
 
@@ -591,7 +612,7 @@ public abstract class AbstractPollingIoP
 
         if ((req = writeRequestQueue.poll(session)) != null) {
             Object message = req.getMessage();
-            
+
             if (message instanceof IoBuffer) {
                 IoBuffer buf = (IoBuffer)message;
 
@@ -618,12 +639,12 @@ public abstract class AbstractPollingIoP
         if (!failedRequests.isEmpty()) {
             WriteToClosedSessionException cause = new WriteToClosedSessionException(
                     failedRequests);
-            
+
             for (WriteRequest r : failedRequests) {
                 session.decreaseScheduledBytesAndMessages(r);
                 r.getFuture().setException(cause);
             }
-            
+
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(cause);
         }
@@ -651,7 +672,7 @@ public abstract class AbstractPollingIoP
             // add the session to the queue, if it's not already there
             if (session.setScheduledForFlush(true)) {
                 flushingSessions.add(session);
-            }       
+            }
         }
     }
 
@@ -669,17 +690,17 @@ public abstract class AbstractPollingIoP
 
             try {
                 if (hasFragmentation) {
-                    
+
                     while ((ret = read(session, buf)) > 0) {
                         readBytes += ret;
-                        
+
                         if (!buf.hasRemaining()) {
                             break;
                         }
                     }
                 } else {
                     ret = read(session, buf);
-                    
+
                     if (ret > 0) {
                         readBytes = ret;
                     }
@@ -759,7 +780,7 @@ public abstract class AbstractPollingIoP
 
         do {
             S session = flushingSessions.poll(); // the same one with firstSession
-            
+
             if (session == null) {
                 // Just in case ... It should not happen.
                 break;
@@ -768,14 +789,14 @@ public abstract class AbstractPollingIoP
             // Reset the Schedule for flush flag for this session,
             // as we are flushing it now
             session.unscheduledForFlush();
-            
+
             SessionState state = getState(session);
 
             switch (state) {
                 case OPENED:
                     try {
                         boolean flushedAll = flushNow(session, currentTime);
-                        
+
                         if (flushedAll
                                 && !session.getWriteRequestQueue().isEmpty(session)
                                 && !session.isScheduledForFlush()) {
@@ -786,20 +807,20 @@ public abstract class AbstractPollingIoP
                         IoFilterChain filterChain = session.getFilterChain();
                         filterChain.fireExceptionCaught(e);
                     }
-    
+
                     break;
-    
+
                 case CLOSING:
                     // Skip if the channel is already closed.
                     break;
-    
+
                 case OPENING:
                     // Retry later if session is not yet fully initialized.
                     // (In case that Session.write() is called before addSession()
                     // is processed)
                     scheduleFlush(session);
                     return;
-    
+
                 default:
                     throw new IllegalStateException(String.valueOf(state));
             }
@@ -826,34 +847,34 @@ public abstract class AbstractPollingIoP
                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
         int writtenBytes = 0;
         WriteRequest req = null;
-        
+
         try {
             // Clear OP_WRITE
             setInterestedInWrite(session, false);
-            
+
             do {
                 // Check for pending writes.
                 req = session.getCurrentWriteRequest();
-                
+
                 if (req == null) {
                     req = writeRequestQueue.poll(session);
-                    
+
                     if (req == null) {
                         break;
                     }
-                    
+
                     session.setCurrentWriteRequest(req);
                 }
 
                 int localWrittenBytes = 0;
                 Object message = req.getMessage();
-                
+
                 if (message instanceof IoBuffer) {
                     localWrittenBytes = writeBuffer(session, req,
                             hasFragmentation, maxWrittenBytes - writtenBytes,
                             currentTime);
-                    
-                    if (localWrittenBytes > 0
+
+                    if (( localWrittenBytes > 0 )
                             && ((IoBuffer) message).hasRemaining()) {
                         // the buffer isn't empty, we re-interest it in writing
                         writtenBytes += localWrittenBytes;
@@ -870,8 +891,8 @@ public abstract class AbstractPollingIoP
                     // If there's still data to be written in the FileRegion,
                     // return 0 indicating that we need
                     // to pause until writing may resume.
-                    if (localWrittenBytes > 0
-                            && ((FileRegion) message).getRemainingBytes() > 0)
{
+                    if (( localWrittenBytes > 0 )
+                            && ( ((FileRegion) message).getRemainingBytes() >
0 )) {
                         writtenBytes += localWrittenBytes;
                         setInterestedInWrite(session, true);
                         return false;
@@ -901,7 +922,7 @@ public abstract class AbstractPollingIoP
             if (req != null) {
                 req.getFuture().setException(e);
             }
-            
+
             IoFilterChain filterChain = session.getFilterChain();
             filterChain.fireExceptionCaught(e);
             return false;
@@ -915,28 +936,28 @@ public abstract class AbstractPollingIoP
             throws Exception {
         IoBuffer buf = (IoBuffer) req.getMessage();
         int localWrittenBytes = 0;
-        
+
         if (buf.hasRemaining()) {
             int length;
-            
+
             if (hasFragmentation) {
                 length = Math.min(buf.remaining(), maxLength);
             } else {
                 length = buf.remaining();
             }
-            
+
             localWrittenBytes = write(session, buf, length);
         }
 
         session.increaseWrittenBytes(localWrittenBytes, currentTime);
 
-        if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) {
+        if (!buf.hasRemaining() || ( !hasFragmentation && ( localWrittenBytes !=
0 ) )) {
             // Buffer has been sent, clear the current request.
             int pos = buf.position();
             buf.reset();
-            
+
             fireMessageSent(session, req);
-            
+
             // And set it back to its position
             buf.position(pos);
         }
@@ -948,17 +969,17 @@ public abstract class AbstractPollingIoP
             throws Exception {
         int localWrittenBytes;
         FileRegion region = (FileRegion) req.getMessage();
-        
+
         if (region.getRemainingBytes() > 0) {
             int length;
-            
+
             if (hasFragmentation) {
                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
             } else {
                 length = (int) Math.min(Integer.MAX_VALUE, region
                         .getRemainingBytes());
             }
-            
+
             localWrittenBytes = transferFile(session, region, length);
             region.update(localWrittenBytes);
         } else {
@@ -967,8 +988,8 @@ public abstract class AbstractPollingIoP
 
         session.increaseWrittenBytes(localWrittenBytes, currentTime);
 
-        if (region.getRemainingBytes() <= 0 || !hasFragmentation
-                && localWrittenBytes != 0) {
+        if (( region.getRemainingBytes() <= 0 ) || ( !hasFragmentation
+                && ( localWrittenBytes != 0 ) )) {
             fireMessageSent(session, req);
         }
 
@@ -1002,10 +1023,10 @@ public abstract class AbstractPollingIoP
                     updateTrafficControl(session);
 
                     break;
-    
+
                 case CLOSING:
                     break;
-    
+
                 case OPENING:
                     // Retry later if session is not yet fully initialized.
                     // (In case that Session.suspend??() or session.resume??() is
@@ -1013,14 +1034,14 @@ public abstract class AbstractPollingIoP
                     // We just put back the session at the end of the queue.
                     trafficControllingSessions.add(session);
                     break;
-    
+
                 default:
                     throw new IllegalStateException(String.valueOf(state));
             }
-            
+
             // As we have handled one session, decrement the number of
             // remaining sessions. The OPENING session will be processed
-            // with the next select(), as the queue size has been decreased, even 
+            // with the next select(), as the queue size has been decreased, even
             // if the session has been pushed at the end of the queue
             queueSize--;
         }
@@ -1030,7 +1051,7 @@ public abstract class AbstractPollingIoP
      * {@inheritDoc}
      */
     public void updateTrafficControl(S session) {
-        // 
+        //
         try {
             setInterestedInRead(session, !session.isReadSuspended());
         } catch (Exception e) {
@@ -1049,10 +1070,10 @@ public abstract class AbstractPollingIoP
     }
 
     /**
-     * The main loop. This is the place in charge to poll the Selector, and to 
-     * process the active sessions. It's done in 
+     * The main loop. This is the place in charge to poll the Selector, and to
+     * process the active sessions. It's done in
      * - handle the newly created sessions
-     * - 
+     * -
      */
     private class Processor implements Runnable {
         public void run() {
@@ -1065,11 +1086,49 @@ public abstract class AbstractPollingIoP
                     // idle session when we get out of the select every
                     // second. (note : this is a hack to avoid creating
                     // a dedicated thread).
+                    long t0 = System.currentTimeMillis();
                     int selected = select(SELECT_TIMEOUT);
+                    long t1 = System.currentTimeMillis();
+                    long delta = (t1 - t0);
+
+                    if ((selected == 0) && !wakeupCalled.get() && (delta
< 100)) {
+                        // Last chance : the select() may have been
+                        // interrupted because we have had an closed channel.
+                        if (isBrokenConnection()) {
+                            LOG.warn("Broken connection");
+
+                            // we can reselect immediately
+                            // set back the flag to false
+                            wakeupCalled.getAndSet(false);
+
+                            continue;
+                        } else {
+                            LOG.warn("Create a new selector. Selected is 0, delta = "
+                                            + (t1 - t0));
+                            // Ok, we are hit by the nasty epoll
+                            // spinning.
+                            // Basically, there is a race condition
+                            // which causes a closing file descriptor not to be
+                            // considered as available as a selected channel, but
+                            // it stopped the select. The next time we will
+                            // call select(), it will exit immediately for the same
+                            // reason, and do so forever, consuming 100%
+                            // CPU.
+                            // We have to destroy the selector, and
+                            // register all the socket on a new one.
+                            registerNewSelector();
+                        }
+
+                        // Set back the flag to false
+                        wakeupCalled.getAndSet(false);
+
+                        // and continue the loop
+                        continue;
+                    }
 
                     // Manage newly created session first
                     nSessions += handleNewSessions();
-                    
+
                     updateTrafficMask();
 
                     // Now, if we have had some incoming or outgoing events,
@@ -1082,10 +1141,10 @@ public abstract class AbstractPollingIoP
                     // Write the pending requests
                     long currentTime = System.currentTimeMillis();
                     flush(currentTime);
-                    
+
                     // And manage removed sessions
                     nSessions -= removeSessions();
-                    
+
                     // Last, not least, send Idle events to the idle sessions
                     notifyIdleSessions(currentTime);
 
@@ -1106,7 +1165,7 @@ public abstract class AbstractPollingIoP
                         for (Iterator<S> i = allSessions(); i.hasNext();) {
                             scheduleRemove(i.next());
                         }
-                        
+
                         wakeup();
                     }
                 } catch (ClosedSelectorException cse) {

Modified: mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=1069498&r1=1069497&r2=1069498&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
(original)
+++ mina/branches/2.0.3/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
Thu Feb 10 17:53:29 2011
@@ -21,9 +21,11 @@ package org.apache.mina.transport.socket
 
 import java.io.IOException;
 import java.nio.channels.ByteChannel;
+import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -36,7 +38,7 @@ import org.apache.mina.core.session.Sess
 
 /**
  * TODO Add documentation
- * 
+ *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
@@ -44,14 +46,14 @@ public final class NioProcessor extends 
     private Selector selector;
 
     /**
-     * 
+     *
      * Creates a new instance of NioProcessor.
-     * 
+     *
      * @param executor
      */
     public NioProcessor(Executor executor) {
         super(executor);
-        
+
         try {
             // Open a new selector
             selector = Selector.open();
@@ -115,6 +117,70 @@ public final class NioProcessor extends 
         ch.close();
     }
 
+
+    /**
+     * In the case we are using the java select() method, this method is used to
+     * trash the buggy selector and create a new one, registering all the
+     * sockets on it.
+     */
+    @Override
+    protected void registerNewSelector() throws IOException {
+        synchronized (selector) {
+            Set<SelectionKey> keys = selector.keys();
+
+            // Open a new selector
+            Selector newSelector = Selector.open();
+
+            // Loop on all the registered keys, and register them on the new selector
+            for (SelectionKey key : keys) {
+                SelectableChannel ch = key.channel();
+
+                // Don't forget to attache the session, and back !
+                NioSession session = (NioSession)key.attachment();
+                SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
+                session.setSelectionKey( newKey );
+            }
+
+            // Now we can close the old selector and switch it
+            selector.close();
+            selector = newSelector;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected boolean isBrokenConnection() throws IOException {
+        // A flag set to true if we find a broken session
+        boolean brokenSession = false;
+
+        synchronized (selector) {
+            // Get the selector keys
+            Set<SelectionKey> keys = selector.keys();
+
+            // Loop on all the keys to see if one of them
+            // has a closed channel
+            for (SelectionKey key : keys) {
+                SelectableChannel channel = key.channel();
+
+                if ((((channel instanceof DatagramChannel) && ((DatagramChannel)
channel)
+                        .isConnected()))
+                        || ((channel instanceof SocketChannel) && ((SocketChannel)
channel)
+                                .isConnected())) {
+                    // The channel is not connected anymore. Cancel
+                    // the associated key then.
+                    key.cancel();
+
+                    // Set the flag to true to avoid a selector switch
+                    brokenSession = true;
+                }
+            }
+        }
+
+        return brokenSession;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -151,14 +217,14 @@ public final class NioProcessor extends 
     @Override
     protected boolean isInterestedInRead(NioSession session) {
         SelectionKey key = session.getSelectionKey();
-        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) !=
0;
+        return key.isValid() && ( (key.interestOps() & SelectionKey.OP_READ)
!= 0 );
     }
 
     @Override
     protected boolean isInterestedInWrite(NioSession session) {
         SelectionKey key = session.getSelectionKey();
         return key.isValid()
-                && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
+                && ( (key.interestOps() & SelectionKey.OP_WRITE) != 0 );
     }
 
     /**
@@ -193,7 +259,7 @@ public final class NioProcessor extends 
         if (key == null) {
             return;
         }
-        
+
         int newInterestOps = key.interestOps();
 
         if (isInterested) {
@@ -210,7 +276,7 @@ public final class NioProcessor extends 
     @Override
     protected int read(NioSession session, IoBuffer buf) throws Exception {
         ByteChannel channel = session.getChannel();
-        
+
         return session.getChannel().read(buf.buf());
     }
 
@@ -240,7 +306,7 @@ public final class NioProcessor extends 
             // Check to see if the IOException is being thrown due to
             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
             String message = e.getMessage();
-            if (message != null && message.contains("temporarily unavailable")) {
+            if (( message != null ) && message.contains("temporarily unavailable"))
{
                 return 0;
             }
 
@@ -258,7 +324,7 @@ public final class NioProcessor extends 
 
         /**
          * Create this iterator as a wrapper on top of the selectionKey Set.
-         * 
+         *
          * @param keys
          *            The set of selected sessions
          */

Modified: mina/branches/2.0.3/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java?rev=1069498&r1=1069497&r2=1069498&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
(original)
+++ mina/branches/2.0.3/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
Thu Feb 10 17:53:29 2011
@@ -21,6 +21,7 @@ package org.apache.mina.transport.socket
 
 import static org.junit.Assert.assertNotNull;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.NoRouteToHostException;
 import java.util.Iterator;
@@ -43,7 +44,7 @@ import org.junit.Test;
 
 /**
  * Tests non regression on issue DIRMINA-632.
- * 
+ *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public class PollingIoProcessorTest {
@@ -150,6 +151,16 @@ public class PollingIoProcessorTest {
             protected int write(NioSession session, IoBuffer buf, int length) throws Exception
{
                 throw new NoRouteToHostException("No Route To Host Test");
             }
+
+            @Override
+            protected boolean isBrokenConnection() throws IOException {
+                return proc.isBrokenConnection();
+            }
+
+            @Override
+            protected void registerNewSelector() throws IOException {
+                proc.registerNewSelector();
+            }
         });
         connector.setHandler(new IoHandlerAdapter());
 

Modified: mina/branches/2.0.3/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/2.0.3/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=1069498&r1=1069497&r2=1069498&view=diff
==============================================================================
--- mina/branches/2.0.3/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
(original)
+++ mina/branches/2.0.3/mina-transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
Thu Feb 10 17:53:29 2011
@@ -42,7 +42,7 @@ import org.apache.tomcat.jni.Status;
 /**
  * The class in charge of processing socket level IO events for the
  * {@link AprSocketConnector}
- * 
+ *
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
@@ -63,7 +63,7 @@ public final class AprIoProcessor extend
     /**
      * Create a new instance of {@link AprIoProcessor} with a given Exector for
      * handling I/Os events.
-     * 
+     *
      * @param executor
      *            the {@link Executor} for handling I/O events
      */
@@ -466,4 +466,21 @@ public final class AprIoProcessor extend
     private void throwException(int code) throws IOException {
         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " +
code + ")");
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void registerNewSelector() {
+        // Do nothing
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected boolean isBrokenConnection() throws IOException {
+        // Here, we assume that this is the case.
+        return true;
+    }
 }
\ No newline at end of file



Mime
View raw message