geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject cvs commit: incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol DatagramProtocol.java ServerSocketAcceptor.java SocketProtocol.java
Date Tue, 04 May 2004 03:05:36 GMT
adc         2004/05/03 20:05:36

  Modified:    modules/network/src/java/org/apache/geronimo/network
                        SelectionEventListner.java SelectorManager.java
               modules/network/src/java/org/apache/geronimo/network/protocol
                        DatagramProtocol.java ServerSocketAcceptor.java
                        SocketProtocol.java
  Log:
  Workarounds to NIO bugs and a few fixes of my own bugs.
  
  Revision  Changes    Path
  1.3       +2 -2      incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectionEventListner.java
  
  Index: SelectionEventListner.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectionEventListner.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SelectionEventListner.java	10 Mar 2004 09:59:12 -0000	1.2
  +++ SelectionEventListner.java	4 May 2004 03:05:36 -0000	1.3
  @@ -33,5 +33,5 @@
        * When the SelectorKey is triggered, the service method will
        * be called on the attachment.
        */
  -    public void selectionEvent(SelectionKey selection);
  +    public void selectionEvent(SelectorManager.Event event);
   }
  
  
  
  1.8       +95 -34    incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectorManager.java
  
  Index: SelectorManager.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/SelectorManager.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SelectorManager.java	1 May 2004 23:16:22 -0000	1.7
  +++ SelectorManager.java	4 May 2004 03:05:36 -0000	1.8
  @@ -25,6 +25,7 @@
   import java.nio.channels.Selector;
   import java.util.Iterator;
   import java.util.Set;
  +import java.util.Stack;
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  @@ -57,11 +58,6 @@
       private volatile boolean running;
   
       /**
  -     * The guard
  -     */
  -    private Object guard = new Object();
  -
  -    /**
        * The selector used to wait for non-blocking events.
        */
       private Selector selector;
  @@ -86,6 +82,11 @@
        */
       private int startCounter;
   
  +    /**
  +     * A list of channels to be closed.
  +     */
  +    private Stack closing = new Stack();
  +
   
       public SelectorManager() throws IOException {
           threadGroup = new ThreadGroup("Geronimo NIO Workers");
  @@ -127,15 +128,51 @@
           try {
   
               log.debug("Selector Work thread has started.");
  -            log.debug("Selector Manager timeout: "+timeout);
  +            log.debug("Selector Manager timeout: " + timeout);
               while (running) {
                   try {
   
  -                    synchronized (guard) { /* do nothing */
  -                        log.trace("Waiting for selector to return.");
  +                    synchronized (closing) {
  +                        if (!closing.isEmpty()) {
  +                            /**
  +                             * Close channels that have been queued up to be
  +                             * closed.  Closing channels in this manner prevents
  +                             * NullPointExceptions.
  +                             *
  +                             * http://developer.java.sun.com/developer/bugParade/bugs/4729342.html
  +                             */
  +                            Iterator iter = closing.iterator();
  +
  +                            while (iter.hasNext()) {
  +                                SelectableChannel selectableChannel = (SelectableChannel)
iter.next();
  +                                selectableChannel.close();
  +                            }
  +                            closing.clear();
  +                        }
                       }
   
  -                    if (selector.select(timeout) == 0) continue;
  +                    log.trace("Waiting for selector to return.");
  +                    if (selector.select(timeout) == 0) {
  +                        /**
  +                         * Clean stale connections that do not have and data: select
  +                         * returns indicating that the count of active connections with
  +                         * input is 0.  However the list still has these "stale"
  +                         * connections lingering around.  We remove them since they
  +                         * are prematurely triggering selection to return w/o input.
  +                         *
  +                         * http://nagoya.apache.org/jira/secure/ViewIssue.jspa?key=DIR-18
  +                         */
  +                        Iterator list = selector.selectedKeys().iterator();
  +
  +                        while (list.hasNext()) {
  +                            SelectionKey key = (SelectionKey) list.next();
  +                            key.channel().close();
  +                            key.cancel();
  +                            list.remove();
  +                        }
  +
  +                        continue;
  +                    }
   
                       // Get a java.util.Set containing the SelectionKey objects for
                       // all channels that are ready for I/O.
  @@ -143,73 +180,62 @@
   
                       // Use a java.util.Iterator to loop through the selected keys
                       for (Iterator i = keys.iterator(); i.hasNext();) {
  -                        final SelectionKey key = (SelectionKey) i.next();
  +                        SelectionKey key = (SelectionKey) i.next();
   
                           if (key.isReadable()) {
                               log.trace("-OP_READ " + key);
                               key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
  +                            threadPool.getWorkManager().execute(new Event(key, SelectionKey.OP_READ));
                           }
                           if (key.isWritable()) {
                               log.trace("-OP_WRITE " + key);
                               key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
  +                            threadPool.getWorkManager().execute(new Event(key, SelectionKey.OP_WRITE));
                           }
                           if (key.isAcceptable()) {
                               log.trace("-OP_ACCEPT " + key);
                               key.interestOps(key.interestOps() & (~SelectionKey.OP_ACCEPT));
  +                            threadPool.getWorkManager().execute(new Event(key, SelectionKey.OP_ACCEPT));
                           }
   
  -                        threadPool.getWorkManager().execute(new Runnable() {
  -                            public void run() {
  -                                try {
  -                                    ((SelectionEventListner) key.attachment()).selectionEvent(key);
  -                                } catch (Throwable e) {
  -                                    log.trace("Request Failed.", e);
  -                                }
  -                            }
  -                        });
  -
                           i.remove(); // Remove the key from the set of selected keys
                       }
  -                    
  +
                   } catch (CancelledKeyException e) {
  -                    log.debug("Key has Been Cancelled: "+e);
  +                    log.debug("Key has Been Cancelled: " + e);
                   }
               }
           } catch (IOException e) {
               log.warn("IOException occured.", e);
           } catch (InterruptedException e) {
               log.debug("Selector Work thread has been interrupted.");
  -		} finally {
  +        } finally {
               log.debug("Selector Work thread has stopped.");
           }
       }
   
       public SelectionKey register(SelectableChannel selectableChannel, int ops, SelectionEventListner
listener) throws ClosedChannelException {
  -        synchronized (guard) {
  +        synchronized (closing) {
               selector.wakeup();
               SelectionKey key = selectableChannel.register(selector, ops, listener);
               return key;
           }
       }
  +
       public void closeChannel(SelectableChannel selectableChannel) throws IOException {
  -        synchronized (guard) {
  +        synchronized (closing) {
               selector.wakeup();
  -            selectableChannel.keyFor(selector).cancel();
  -            selectableChannel.close();
  +            closing.push(selectableChannel);
           }
       }
   
       public void addInterestOps(SelectionKey selectorKey, int addOpts) {
  -        synchronized (guard) {
  +        synchronized (closing) {
               selector.wakeup();
  -            selectorKey.interestOps( selectorKey.interestOps() | addOpts );
  +            selectorKey.interestOps(selectorKey.interestOps() | addOpts);
           }
       }
   
  -    public void wakeup() {
  -        selector.wakeup();
  -    }
  -
       public void setGBeanContext(GBeanContext context) {
       }
   
  @@ -250,5 +276,40 @@
   
       public static GBeanInfo getGBeanInfo() {
           return GBEAN_INFO;
  +    }
  +
  +    public class Event implements Runnable {
  +
  +        final int flags;
  +        final SelectionKey key;
  +
  +        private Event(SelectionKey key, int flags) {
  +            this.flags = flags;
  +            this.key = key;
  +        }
  +
  +        public SelectionKey getSelectionKey() {
  +            return key;
  +        }
  +
  +        public final boolean isReadable() {
  +            return (flags & SelectionKey.OP_READ) != 0;
  +        }
  +
  +        public final boolean isWritable() {
  +            return (flags & SelectionKey.OP_WRITE) != 0;
  +        }
  +
  +        public final boolean isAcceptable() {
  +            return (flags & SelectionKey.OP_ACCEPT) != 0;
  +        }
  +
  +        public void run() {
  +            try {
  +                ((SelectionEventListner) key.attachment()).selectionEvent(this);
  +            } catch (Throwable e) {
  +                log.trace("Request Failed.", e);
  +            }
  +        }
       }
   }
  
  
  
  1.6       +2 -2      incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/DatagramProtocol.java
  
  Index: DatagramProtocol.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/DatagramProtocol.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- DatagramProtocol.java	25 Apr 2004 02:03:37 -0000	1.5
  +++ DatagramProtocol.java	4 May 2004 03:05:36 -0000	1.6
  @@ -207,7 +207,7 @@
   
       ByteBuffer receiveBuffer = ByteBuffer.allocate(65336);
   
  -    public synchronized void selectionEvent(SelectionKey selection) {
  +    public synchronized void selectionEvent(SelectorManager.Event event) {
           boolean tracing = log.isTraceEnabled();
   
           if (tracing) log.trace("ReadDataAction triggered.");
  
  
  
  1.8       +4 -4      incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/ServerSocketAcceptor.java
  
  Index: ServerSocketAcceptor.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/ServerSocketAcceptor.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- ServerSocketAcceptor.java	1 May 2004 17:23:55 -0000	1.7
  +++ ServerSocketAcceptor.java	4 May 2004 03:05:36 -0000	1.8
  @@ -171,10 +171,10 @@
           state = STOPPED;        
       }
   
  -    public void selectionEvent(SelectionKey selection) {
  -        if (selection.isAcceptable()) {
  +    public void selectionEvent(SelectorManager.Event event) {
  +        if (event.isAcceptable()) {
               try {
  -                ServerSocketChannel server = (ServerSocketChannel) selection.channel();
  +                ServerSocketChannel server = (ServerSocketChannel) event.getSelectionKey().channel();
                   SocketChannel channel = server.accept();
   
                   if (channel == null) return;
  
  
  
  1.13      +9 -8      incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java
  
  Index: SocketProtocol.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/modules/network/src/java/org/apache/geronimo/network/protocol/SocketProtocol.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SocketProtocol.java	1 May 2004 23:16:37 -0000	1.12
  +++ SocketProtocol.java	4 May 2004 03:05:36 -0000	1.13
  @@ -185,7 +185,7 @@
   
           if (address == null && acceptedSocketChannel == null) throw new IllegalStateException("No
address set");
   
  -        log.trace("Starting");
  +        log.trace("Starting "+ this);
           if (acceptedSocketChannel == null) {
               try {
                   socketChannel = SocketChannel.open();
  @@ -271,26 +271,27 @@
           }
       }
   
  -    public void selectionEvent(SelectionKey selection) {
  +    public void selectionEvent(SelectorManager.Event event) {
           try {
  -            if (selection.isReadable()) {
  +            if (event.isReadable()) {
                   synchronized (serviceReadMutex) {
                       serviceRead();
                   }
               }
  -            if (selection.isWritable()) {
  +            if (event.isWritable()) {
                   synchronized (serviceWriteMutex) {
                       serviceWrite();
                   }
               }
           } catch (CancelledKeyException e) {
  +            log.trace("CancelledKeyException " + e);
               // who knows, by the time we get here,
               // the key could have been canceled.
           }
       }
   
       private void serviceWrite() {
  -        log.trace("serviceWrite() triggered.");
  +        log.trace("serviceWrite() triggered " + selectionKey);
           try {
               if (sendBuffer == null) {
                   log.trace("Write had allready been serviced.");
  @@ -328,7 +329,7 @@
   
       public void serviceRead() {
           boolean tracing = log.isTraceEnabled();
  -        if (tracing) log.trace("serviceRead() triggered.");
  +        if (tracing) log.trace("serviceRead() triggered " + selectionKey);
           lastUsed = System.currentTimeMillis();
           try {
               while (true) {
  @@ -430,7 +431,7 @@
                   } catch (Throwable e) {
                       log.info("Closing error: ", e);
                   }
  -                log.trace("Closed");
  +                log.trace("Closed "+ this);
               }
               state = STOPPED;
           }
  
  
  

Mime
View raw message