directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r109861 - in incubator/directory/seda/branches/trustin/src: examples/org/apache/netty/examples/echo/server java/org/apache/netty/common/util java/org/apache/netty/downstream java/org/apache/netty/downstream/impl/tcp java/org/apache/netty/upstream
Date Sun, 05 Dec 2004 06:25:43 GMT
Author: trustin
Date: Sat Dec  4 22:25:42 2004
New Revision: 109861

URL: http://svn.apache.org/viewcvs?view=rev&rev=109861
Log:
 * Optimized TcpIoProcessor by using Queue instead of ArrayList
 * Fixed: problem in high traffic situation
 * Changed the interface of SessionHandler
   * added dataWritten
   * removed markRemoved
   * passes the number of read/written bytes
 * Changed the interface of Session
   * removed flush(mark)
   * added isIdle(...)

TODO:
 * Implement TcpConnector
 * Fix 100% CPU consumption when read buffer is full and user don't clear it.
Modified:
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java

Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
(original)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
Sat Dec  4 22:25:42 2004
@@ -34,15 +34,21 @@
 		cause.printStackTrace(System.out);
 	}
 
-	public void dataRead(Session session, ByteBuffer buf) {
-		System.out.println(session.getRemoteAddress() + ": READ (" + buf.remaining() + " B)");
-		session.getWriteBuffer().put(buf);
-		session.flush();
+	public void dataRead(Session session, ByteBuffer buf, int readBytes) {
+		System.out.println(session.getRemoteAddress() + ": READ (" + readBytes + " B)");
+		if (buf.remaining() <= session.getWriteBuffer().remaining()) {
+			session.getWriteBuffer().put(buf);
+			session.flush();
+		}
 	}
 
-	public void markRemoved(Session session, Object mark) {
-	}
-
-	public void writeBufferAvailable(Session session) {
+	public void dataWritten(Session session, ByteBuffer buf, int writtenBytes) {
+		System.out.println(session.getRemoteAddress() + ": WRITTEN (" + writtenBytes + "B)");
+		ByteBuffer readBuf = session.getReadBuffer();
+		System.out.println(readBuf.remaining() + " " + buf.remaining());
+		if (readBuf.remaining() <= buf.remaining()) {
+			buf.put(readBuf);
+			session.flush();
+		}
 	}
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Sat Dec  4 22:25:42 2004
@@ -26,18 +26,14 @@
  * @author Trustin Lee (http://gleamynode.net/dev/)
  */
 public class ByteBufferPool {
-    static final int DEFAULT_BUF_SIZE = 8192;
+    public static final int CAPACITY = 8192;
     private static Queue buffers = new Queue(16);
 
-    static {
-        buffers.open();
-    }
-
     public static synchronized ByteBuffer open() {
         ByteBuffer buf = (ByteBuffer) buffers.pop();
 
         if (buf == null) {
-            buf = ByteBuffer.allocateDirect(DEFAULT_BUF_SIZE);
+            buf = ByteBuffer.allocateDirect(CAPACITY);
         } else {
             buf.clear();
         }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Sat Dec  4 22:25:42 2004
@@ -40,7 +40,6 @@
     private int first = 0;
     private int last = 0;
     private int size = 0;
-    private boolean open = false;
 
     /**
      * Construct a new, empty <code>Queue</code> with the specified initial
@@ -50,20 +49,10 @@
         items = new Object[initialCapacity];
     }
 
-    public void open() {
-        clear();
-        open = true;
-    }
-
-    public void close() {
-        open = false;
-        clear();
-    }
-
     /**
      * Clears this queue.
      */
-    private void clear() {
+    public void clear() {
         Arrays.fill(items, null);
         first = 0;
         last = 0;
@@ -93,11 +82,7 @@
     /**
      * Enqueue into this queue.
      */
-    public boolean push(Object obj) {
-        if (!open) {
-            return false;
-        }
-
+    public void push(Object obj) {
         if (size == items.length) {
             // expand queue
             final int oldLen = items.length;
@@ -118,7 +103,6 @@
         items[last] = obj;
         last = (last + 1) % items.length;
         size++;
-        return true;
     }
 
     /**
@@ -128,10 +112,6 @@
      *         really <code>null</code>.
      */
     public Object first() {
-        if (!open) {
-            return null;
-        }
-
         if (size == 0) {
             return null;
         }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
Sat Dec  4 22:25:42 2004
@@ -20,9 +20,9 @@
 package org.apache.netty.downstream;
 
 import java.net.SocketAddress;
-
 import java.nio.ByteBuffer;
 
+import org.apache.netty.common.IdleStatus;
 import org.apache.netty.common.SessionConfig;
 
 
@@ -42,10 +42,8 @@
     ByteBuffer getReadBuffer();
 
     ByteBuffer getWriteBuffer();
-
-    void flush();
     
-    void flush(Object mark);
+    void flush();
 
     boolean isConnected();
 
@@ -66,4 +64,6 @@
     long getLastReadTime();
 
     long getLastWriteTime();
+    
+    boolean isIdle(IdleStatus status);
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
Sat Dec  4 22:25:42 2004
@@ -39,9 +39,7 @@
 
     void exceptionCaught(Session session, Throwable cause);
 
-    void dataRead(Session session, ByteBuffer buf);
+    void dataRead(Session session, ByteBuffer readBuf, int readBytes);
 
-    void markRemoved(Session session, Object mark);
-    
-    void writeBufferAvailable(Session session);
+    void dataWritten(Session session, ByteBuffer writeBuf, int writtenBytes);
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
Sat Dec  4 22:25:42 2004
@@ -1,4 +1,20 @@
 /*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
  * @(#) $Id$
  */
 package org.apache.netty.downstream.impl.tcp;
@@ -8,291 +24,384 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.common.SessionConfig;
+import org.apache.netty.common.util.ByteBufferPool;
+import org.apache.netty.common.util.Queue;
+
 
 /**
  * TODO Document me.
- * TODO Implement idleTime/bufferWritable/
+ * TODO Implement markRemoved
+ * 
  * @author Trustin Lee (trustin@apache.org)
- * @version $Rev$, $Date$, 
+ * @version $Rev$, $Date$,
  */
 class TcpIoProcessor {
-	private static final Log log = LogFactory.getLog(TcpIoProcessor.class);
-	private static final TcpIoProcessor instance;
-	
-	static {
-		TcpIoProcessor tmp;
-		try {
-			tmp = new TcpIoProcessor();
-		} catch (IOException e) {
-			tmp = null;
-		}
-		
-		instance = tmp;
-	}
-	
-	public static TcpIoProcessor getInstance() throws IOException {
-		if (instance == null)
-			throw new IOException("Failed to open selector.");
-		return instance;
-	}
-	
-	private final Selector selector;
-	private final List newSessions = new ArrayList();
-	private final List removingSessions = new ArrayList();
-	private final List flushingSessions = new ArrayList();
-	private Worker worker;
-	
-	private TcpIoProcessor() throws IOException {
-		selector = Selector.open();
-	}
-	
-	public void addSession(TcpSession session) {
-		if (worker == null) {
-			synchronized (this) {
-				if (worker == null) {
-					worker = new Worker();
-					worker.start();
-				}
-			}
-		}
-		
-		synchronized (newSessions) {
-			newSessions.add(session);
-		}
-		selector.wakeup();
-	}
-	
-	public void removeSession(TcpSession session) {
-		synchronized (removingSessions) {
-			removingSessions.add(session);
-		}
-		selector.wakeup();
-	}
-	
-	public void flushSession(TcpSession session) {
-		synchronized (flushingSessions) {
-			flushingSessions.add(session);
-		}
-		selector.wakeup();
-	}
-	
-	private class Worker extends Thread {
-		public Worker() {
-			super("TcpIoProcessor");
-			setDaemon(true);
-		}
-		
-		public void run() {
-			for (;;) {
-				try {
-					int nKeys = selector.select();
-					addSessions();
-					if (nKeys > 0) {
-						processSessions(selector.selectedKeys());
-					}
-					flushSessions();
-					removeSessions();
-				} catch (IOException e) {
-					log.error("Unexpected exception.", e);
-					try {
-						Thread.sleep(1000);
-					} catch (InterruptedException e1) {
-					}
-				}
-			}
-		}
-	}
-	
-	private void addSessions() {
-		if (newSessions.size() == 0)
-			return;
-		
-		synchronized (newSessions) {
-			Iterator it = newSessions.iterator();
-			while (it.hasNext()) {
-				TcpSession session = (TcpSession) it.next();
-				SocketChannel ch = session.getChannel();
-				boolean registered;
-				try {
-					ch.configureBlocking(false);
-					session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
-					registered = true;
-				} catch (IOException e) {
-					registered = false;
-					fireExceptionCaught(session, e);
-				}
-				
-				if (registered) {
-					fireSessionOpened(session);
-				}
-			}
-			
-			newSessions.clear();
-		}
-	}
-
-	private void removeSessions() {
-		if (removingSessions.size() == 0)
-			return;
-
-		synchronized (removingSessions) {
-			Iterator it = removingSessions.iterator();
-			while (it.hasNext()) {
-				TcpSession session = (TcpSession) it.next();
-				SocketChannel ch = session.getChannel();
-				session.getSelectionKey().cancel();
-				session.dispose();
-				try {
-					ch.close();
-				} catch (IOException e) {
-					fireExceptionCaught(session, e);
-				} finally {
-					fireSessionClosed(session);
-				}
-			}
-			
-			removingSessions.clear();
-		}
-	}
-	
-	private void processSessions(Set selectedKeys) {
-		Iterator it = selectedKeys.iterator();
-		while (it.hasNext()) {
-			SelectionKey key = (SelectionKey) it.next();
-			it.remove();
-			TcpSession session = (TcpSession) key.attachment();
-			if (key.isReadable()) {
-				read(session);
-			} else if (key.isWritable()) {
-				scheduleFlush(session);
-			}
-		}
-		selectedKeys.clear();
-	}
-	
-	private void read(TcpSession session) {
-		ByteBuffer readBuf = session.getReadBuffer();
-		SocketChannel ch = session.getChannel();
-		try {
-			int readBytes = 0;
-			int ret;
-			
-			synchronized (readBuf) {
-				while ((ret = ch.read(readBuf)) > 0) {
-					readBytes += ret;
-				}
-				
-				if (readBytes > 0) {
-					session.increaseReadBytes(readBytes);
-					readBuf.flip();
-					fireDataRead(session, readBuf);
-					readBuf.compact();
-				}
-			}
-			
-			if (ret < 0) {
-				scheduleRemove(session);
-			}
-		} catch (IOException e) {
-			fireExceptionCaught(session, e);
-		}
-	}
-	
-	private void scheduleRemove(TcpSession session) {
-		synchronized (removingSessions) {
-			removingSessions.add(session);
-		}
-	}
-
-	private void scheduleFlush(TcpSession session) {
-		session.getSelectionKey().interestOps(SelectionKey.OP_READ);
-		synchronized (flushingSessions) {
-			flushingSessions.add(session);
-		}
-	}
-
-	private void flushSessions() {
-		if (flushingSessions.size() == 0)
-			return;
-
-		synchronized (flushingSessions) {
-			Iterator it = flushingSessions.iterator();
-			while (it.hasNext()) {
-				TcpSession session = (TcpSession) it.next();
-				if (session.isClosed())
-					continue;
-
-				flush(session);
-			}
-			
-			flushingSessions.clear();
-		}
-	}
-	
-	private void flush(TcpSession session) {
-		ByteBuffer writeBuf = session.getWriteBuffer();
-		SocketChannel ch = session.getChannel();
-		
-		try {
-			synchronized (writeBuf) {
-				writeBuf.flip();
-				int nBytes = ch.write(writeBuf);
-				if (nBytes > 0)
-					session.increaseWrittenBytes(nBytes);
-
-				int remaining = writeBuf.remaining();
-				if (remaining > 0){
-					// Kernel buffer is full
-					session.getSelectionKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
-				}
-
-				writeBuf.compact();
-			}
-		} catch (IOException e) {
-			fireExceptionCaught(session, e);
-		}
-	}
-
-	private void fireSessionOpened(TcpSession session) {
-		try {
-			session.getHandler().sessionOpened(session);
-		} catch (Throwable e) {
-			fireExceptionCaught(session, e);
-		}
-	}
-	
-	private void fireSessionClosed(TcpSession session) {
-		try {
-			session.getHandler().sessionClosed(session);
-		} catch (Throwable e) {
-			fireExceptionCaught(session, e);
-		}
-	}
-	
-	private void fireDataRead(TcpSession session, ByteBuffer readBuf) {
-		try {
-			session.getHandler().dataRead(session, readBuf);
-		} catch (Throwable e) {
-			fireExceptionCaught(session, e);
-		}
-	}
-	
-	private void fireExceptionCaught(TcpSession session, Throwable cause) {
-		try {
-			session.getHandler().exceptionCaught(session, cause);
-			if (cause instanceof IOException) {
-				scheduleRemove(session);
-			}
-		} catch (Throwable t) {
-			log.error("Exception from excaptionCaught.", t);
-		}
-	}
+    private static final Log log = LogFactory.getLog(TcpIoProcessor.class);
+    private static final TcpIoProcessor instance;
+
+    static {
+        TcpIoProcessor tmp;
 
+        try {
+            tmp = new TcpIoProcessor();
+        } catch (IOException e) {
+            log.fatal("Failed to open selector.", e);
+            tmp = null;
+        }
+
+        instance = tmp;
+    }
+
+    private final Selector selector;
+    private final Queue newSessions = new Queue(16);
+    private final Queue removingSessions = new Queue(16);
+    private final Queue flushingSessions = new Queue(16);
+    private Worker worker;
+    private long lastIdleCheckTime = System.currentTimeMillis();
+
+    private TcpIoProcessor() throws IOException {
+        selector = Selector.open();
+    }
+
+    public static TcpIoProcessor getInstance() {
+        return instance;
+    }
+
+    public void addSession(TcpSession session) {
+        if (worker == null) {
+            synchronized (this) {
+                if (worker == null) {
+                    worker = new Worker();
+                    worker.start();
+                }
+            }
+        }
+
+        synchronized (newSessions) {
+            newSessions.push(session);
+        }
+
+        selector.wakeup();
+    }
+
+    public void removeSession(TcpSession session) {
+    	scheduleRemove(session);
+        selector.wakeup();
+    }
+
+    public void flushSession(TcpSession session) {
+    	scheduleFlush(session);
+        selector.wakeup();
+    }
+
+    private void addSessions() {
+        if (newSessions.size() == 0)
+            return;
+
+        TcpSession session;
+
+        for (;;) {
+            synchronized (newSessions) {
+                session = (TcpSession) newSessions.pop();
+            }
+
+            if (session == null)
+                break;
+
+            SocketChannel ch = session.getChannel();
+            boolean registered;
+
+            try {
+                ch.configureBlocking(false);
+                ch.socket().setSendBufferSize(ByteBufferPool.CAPACITY);
+                ch.socket().setReceiveBufferSize(ByteBufferPool.CAPACITY);
+                session.setSelectionKey(ch.register(selector,
+                                                    SelectionKey.OP_READ,
+                                                    session));
+                registered = true;
+            } catch (IOException e) {
+                registered = false;
+                fireExceptionCaught(session, e);
+            }
+
+            if (registered) {
+                fireSessionOpened(session);
+            }
+        }
+    }
+
+    private void removeSessions() {
+        if (removingSessions.size() == 0)
+            return;
+
+        for (;;) {
+            TcpSession session;
+
+            synchronized (removingSessions) {
+                session = (TcpSession) removingSessions.pop();
+            }
+
+            if (session == null)
+                break;
+
+            SocketChannel ch = session.getChannel();
+            session.getSelectionKey().cancel();
+            session.dispose();
+
+            try {
+                ch.close();
+            } catch (IOException e) {
+                fireExceptionCaught(session, e);
+            } finally {
+                fireSessionClosed(session);
+            }
+        }
+    }
+
+    private void processSessions(Set selectedKeys) {
+        Iterator it = selectedKeys.iterator();
+
+        while (it.hasNext()) {
+            SelectionKey key = (SelectionKey) it.next();
+            TcpSession session = (TcpSession) key.attachment();
+
+            if (key.isReadable()) {
+                read(session);
+            }
+            
+            if (key.isWritable()) {
+                scheduleFlush(session);
+            }
+        }
+
+        selectedKeys.clear();
+    }
+
+    private void read(TcpSession session) {
+        ByteBuffer readBuf = session.getReadBuffer();
+        SocketChannel ch = session.getChannel();
+
+        try {
+            int readBytes = 0;
+            int ret;
+
+            synchronized (readBuf) {
+                readBuf.compact();
+                try {
+	                while ((ret = ch.read(readBuf)) > 0) {
+	                    readBytes += ret;
+	                }
+                } finally {
+                	readBuf.flip();
+                }
+                
+                session.increaseReadBytes(readBytes);
+                if (ret >= 0)
+                	fireDataRead(session, readBuf, readBytes);
+                else
+                    scheduleRemove(session);
+            }
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void scheduleRemove(TcpSession session) {
+        synchronized (removingSessions) {
+            removingSessions.push(session);
+        }
+    }
+
+    private void scheduleFlush(TcpSession session) {
+        synchronized (flushingSessions) {
+            flushingSessions.push(session);
+        }
+    }
+
+    private void notifyIdleSessions() {
+        Set keys = selector.keys();
+        Iterator it;
+        TcpSession session;
+
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+
+        if ((keys != null) && ((currentTime - lastIdleCheckTime) >= 1000)) {
+            lastIdleCheckTime = currentTime;
+            it = keys.iterator();
+
+            while (it.hasNext()) {
+                SelectionKey key = (SelectionKey) it.next();
+                session = (TcpSession) key.attachment();
+
+                notifyIdleSession(session, currentTime);
+            }
+        }
+    }
+
+    private void notifyIdleSession(TcpSession session, long currentTime) {
+        SessionConfig config = session.getConfig();
+        notifyIdleSession0(session, currentTime,
+                           config.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+                           IdleStatus.BOTH_IDLE, session.getLastIoTime());
+        notifyIdleSession0(session, currentTime,
+                           config.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+                           IdleStatus.READER_IDLE, session.getLastReadTime());
+        notifyIdleSession0(session, currentTime,
+                           config.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+                           IdleStatus.WRITER_IDLE, session.getLastWriteTime());
+    }
+
+    private void notifyIdleSession0(TcpSession session, long currentTime,
+                                    long idleTime, IdleStatus status,
+                                    long lastIoTime) {
+        if (idleTime > 0 && session.isIdle(status) &&
+                (currentTime - lastIoTime) >= idleTime) {
+            session.setIdle(status);
+            fireSessionIdle(session, status);
+        }
+    }
+
+    private void flushSessions() {
+        if (flushingSessions.size() == 0)
+            return;
+
+        for (;;) {
+            TcpSession session;
+
+            synchronized (flushingSessions) {
+                session = (TcpSession) flushingSessions.pop();
+            }
+
+            if (session == null)
+                break;
+
+            if (session.isClosed())
+                continue;
+
+            flush(session);
+        }
+    }
+
+    private void flush(TcpSession session) {
+        ByteBuffer writeBuf = session.getWriteBuffer();
+        SocketChannel ch = session.getChannel();
+
+        try {
+            synchronized (writeBuf) {
+                writeBuf.flip();
+                int writtenBytes;
+                try {
+	                writtenBytes = ch.write(writeBuf);
+                } finally {
+	                if (writeBuf.hasRemaining()) {
+	                    // Kernel buffer is full
+	                    session.getSelectionKey().interestOps(SelectionKey.OP_READ |
+	                                                          SelectionKey.OP_WRITE);
+	                } else {
+	                    session.getSelectionKey().interestOps(SelectionKey.OP_READ);
+	                }
+
+                	writeBuf.compact();
+                }
+                
+                if (writtenBytes > 0) {
+	                session.increaseWrittenBytes(writtenBytes);
+	                fireDataWritten(session, writeBuf, writtenBytes);
+                }
+            }
+        } catch (IOException e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireSessionOpened(TcpSession session) {
+        try {
+            session.getHandler().sessionOpened(session);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireSessionClosed(TcpSession session) {
+        try {
+            session.getHandler().sessionClosed(session);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireSessionIdle(TcpSession session, IdleStatus status) {
+        try {
+            session.getHandler().sessionIdle(session, status);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireDataRead(TcpSession session, ByteBuffer readBuf, int readBytes) {
+        try {
+            session.getHandler().dataRead(session, readBuf, readBytes);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireDataWritten(TcpSession session, ByteBuffer writeBuf, int writtenBytes)
{
+        try {
+            session.getHandler().dataWritten(session, writeBuf, writtenBytes);
+        } catch (Throwable e) {
+            fireExceptionCaught(session, e);
+        }
+    }
+
+    private void fireExceptionCaught(TcpSession session, Throwable cause) {
+        try {
+            session.getHandler().exceptionCaught(session, cause);
+
+            if (cause instanceof IOException) {
+                scheduleRemove(session);
+            }
+        } catch (Throwable t) {
+            log.error("Exception from excaptionCaught.", t);
+        }
+    }
+
+    private class Worker extends Thread {
+        public Worker() {
+            super("TcpIoProcessor");
+            setDaemon(true);
+        }
+
+        public void run() {
+            for (;;) {
+                try {
+                    int nKeys = selector.select(1000);
+                    addSessions();
+
+                    if (nKeys > 0) {
+                        processSessions(selector.selectedKeys());
+                    }
+
+                    flushSessions();
+                    removeSessions();
+                    notifyIdleSessions();
+                } catch (IOException e) {
+                    log.error("Unexpected exception.", e);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+        }
+    }
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Sat Dec  4 22:25:42 2004
@@ -3,13 +3,13 @@
  */
 package org.apache.netty.downstream.impl.tcp;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
 import org.apache.commons.lang.Validate;
+import org.apache.netty.common.IdleStatus;
 import org.apache.netty.common.SessionConfig;
 import org.apache.netty.common.util.ByteBufferPool;
 import org.apache.netty.downstream.Session;
@@ -34,6 +34,9 @@
 	private long writtenBytes;
 	private long lastReadTime;
 	private long lastWriteTime;
+	private boolean idleForBoth;
+	private boolean idleForRead;
+	private boolean idleForWrite;
     
     /**
      * Creates a new instance.
@@ -41,7 +44,7 @@
     TcpSession(SocketChannel ch, SessionHandler defaultHandler) {
         this.ch = ch;
         this.config = new TcpSessionConfig(ch);
-        this.readBuf = ByteBufferPool.open();
+        this.readBuf = (ByteBuffer) ByteBufferPool.open().limit(0);
         this.writeBuf = ByteBufferPool.open();
         this.handler = defaultHandler;
     }
@@ -73,11 +76,7 @@
     }
 
     public void close() {
-    	try {
-			TcpIoProcessor.getInstance().removeSession(this);
-		} catch (IOException e) {
-			// This cannot happen
-		}
+		TcpIoProcessor.getInstance().removeSession(this);
     }
     
     public ByteBuffer getReadBuffer() {
@@ -89,19 +88,7 @@
     }
     
     public void flush() {
-    	try {
-			TcpIoProcessor.getInstance().flushSession(this);
-		} catch (IOException e) {
-			// This cannot happen
-		}
-    }
-
-    public void flush(Object mark) {
-    	try {
-			TcpIoProcessor.getInstance().flushSession(this);
-		} catch (IOException e) {
-			// This cannot happen
-		}
+		TcpIoProcessor.getInstance().flushSession(this);
     }
 
     public boolean isConnected() {
@@ -153,4 +140,28 @@
     public long getLastWriteTime() {
         return lastWriteTime;
     }
+
+	public boolean isIdle(IdleStatus status) {
+        if (status == IdleStatus.BOTH_IDLE)
+            return idleForBoth;
+
+        if (status == IdleStatus.READER_IDLE)
+            return idleForRead;
+
+        if (status == IdleStatus.WRITER_IDLE)
+            return idleForWrite;
+        
+        throw new IllegalArgumentException("Unknown idle status: " + status);
+	}
+	
+	void setIdle(IdleStatus status) {
+        if (status == IdleStatus.BOTH_IDLE)
+            idleForBoth = true;
+        else if (status == IdleStatus.READER_IDLE)
+            idleForRead = true;
+        else if (status == IdleStatus.WRITER_IDLE)
+            idleForWrite = true;
+        else
+        	throw new IllegalArgumentException("Unknown idle status: " + status);
+	}
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java
Sat Dec  4 22:25:42 2004
@@ -21,6 +21,7 @@
 
 import java.net.SocketAddress;
 
+import org.apache.netty.common.IdleStatus;
 import org.apache.netty.common.SessionConfig;
 
 
@@ -43,6 +44,8 @@
 
     boolean write(Object message);
 
+    void notifyWhenWritable();
+
     boolean isConnected();
 
     boolean isClosed();
@@ -58,4 +61,6 @@
     long getLastReadTime();
 
     long getLastWriteTime();
+    
+    boolean isIdle(IdleStatus status);
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r2=109861
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Sat Dec  4 22:25:42 2004
@@ -40,4 +40,6 @@
     void messageReceived(Session session, Object message);
 
     void messageSent(Session session, Object message);
+    
+    void sessionWritable(Session session);
 }

Mime
View raw message