directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: r109794 - in incubator/directory/seda/branches/trustin/src/java/org/apache/netty: common/util downstream/impl downstream/impl/tcp
Date Sat, 04 Dec 2004 09:56:28 GMT
Author: trustin
Date: Sat Dec  4 01:56:28 2004
New Revision: 109794

URL: http://svn.apache.org/viewcvs?view=rev&rev=109794
Log:
Implementing downstream...
Added:
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=auto&rev=109794
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Sat Dec  4 01:56:28 2004
@@ -0,0 +1,51 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.netty.common.util;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * TODO Insert type comment.
+ *
+ * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @author Trustin Lee (http://gleamynode.net/dev/)
+ */
+public class ByteBufferPool {
+    static final int DEFAULT_BUF_SIZE = 8192;
+    private static Queue buffers = new Queue(16);
+
+    static {
+        buffers.open();
+    }
+
+    public static synchronized ByteBuffer open() {
+        ByteBuffer buf = (ByteBuffer) buffers.pop();
+
+        if (buf == null) {
+            buf = ByteBuffer.allocateDirect(DEFAULT_BUF_SIZE);
+        } else {
+            buf.clear();
+        }
+
+        return buf;
+    }
+
+    public static synchronized void close(ByteBuffer buf) {
+        buffers.push(buf);
+    }
+}

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=auto&rev=109794
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Sat Dec  4 01:56:28 2004
@@ -0,0 +1,155 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id: Queue.java 47 2004-11-24 05:58:31Z trustin $
+ */
+package org.apache.netty.common.util;
+
+import java.io.Serializable;
+
+import java.util.Arrays;
+
+
+/**
+ * <p>
+ * A simple queue class. This class is <b>NOT </b> thread-safe.
+ * </p>
+ *
+ * @author Trustin Lee (http://gleamynode.net/dev/)
+ *         href="http://projects.gleamynode.net/">http://projects.gleamynode.net/
+ *         </a>)
+ *
+ * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ */
+public class Queue implements Serializable {
+    private Object[] items;
+    private int first = 0;
+    private int last = 0;
+    private int size = 0;
+    private boolean open = false;
+
+    /**
+     * Construct a new, empty <code>Queue</code> with the specified initial
+     * capacity.
+     */
+    public Queue(int initialCapacity) {
+        items = new Object[initialCapacity];
+    }
+
+    public void open() {
+        clear();
+        open = true;
+    }
+
+    public void close() {
+        open = false;
+        clear();
+    }
+
+    /**
+     * Clears this queue.
+     */
+    private void clear() {
+        Arrays.fill(items, null);
+        first = 0;
+        last = 0;
+        size = 0;
+    }
+
+    /**
+     * Dequeues from this queue.
+     *
+     * @return <code>null</code>, if this queue is empty or the element is
+     *         really <code>null</code>.
+     */
+    public Object pop() {
+        if (size == 0) {
+            return null;
+        }
+
+        Object ret = items[first];
+        items[first] = null;
+        first = (first + 1) % items.length;
+
+        size--;
+
+        return ret;
+    }
+
+    /**
+     * Enqueue into this queue.
+     */
+    public boolean push(Object obj) {
+        if (!open) {
+            return false;
+        }
+
+        if (size == items.length) {
+            // expand queue
+            final int oldLen = items.length;
+            Object[] tmp = new Object[oldLen * 2];
+
+            if (first < last) {
+                System.arraycopy(items, first, tmp, 0, last - first);
+            } else {
+                System.arraycopy(items, first, tmp, 0, oldLen - first);
+                System.arraycopy(items, 0, tmp, oldLen - first, last);
+            }
+
+            first = 0;
+            last = oldLen;
+            items = tmp;
+        }
+
+        items[last] = obj;
+        last = (last + 1) % items.length;
+        size++;
+        return true;
+    }
+
+    /**
+     * Returns the first element of the queue.
+     *
+     * @return <code>null</code>, if the queue is empty, or the element is
+     *         really <code>null</code>.
+     */
+    public Object first() {
+        if (!open) {
+            return null;
+        }
+
+        if (size == 0) {
+            return null;
+        }
+
+        return items[first];
+    }
+
+    /**
+     * Returns <code>true</code> if the queue is empty.
+     */
+    public boolean isEmpty() {
+        return (size == 0);
+    }
+
+    /**
+     * Returns the number of elements in the queue.
+     */
+    public int size() {
+        return size;
+    }
+}

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java?view=auto&rev=109794
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
Sat Dec  4 01:56:28 2004
@@ -0,0 +1,145 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.lang.Validate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.netty.downstream.Acceptor;
+import org.apache.netty.downstream.SessionHandler;
+
+
+/**
+ * TODO Insert type comment.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class TcpAcceptor implements Acceptor {
+    private static volatile int nextId = 0;
+    private static final Log log = LogFactory.getLog(TcpAcceptor.class);
+    private final int id = nextId++;
+    private final Selector selector;
+    private final Map channels = new HashMap();
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     * @throws IOException
+     *
+     *
+     */
+    public TcpAcceptor() throws IOException {
+        selector = Selector.open();
+    }
+
+    public void bind(SocketAddress address, SessionHandler defaultHandler)
+              throws IOException {
+        this.bind(address, 50, defaultHandler);
+    }
+
+    public synchronized void bind(SocketAddress address, int backlog,
+                                  SessionHandler defaultHandler)
+                           throws IOException {
+        Validate.notNull(address);
+        Validate.notNull(defaultHandler);
+        if (!(address instanceof InetSocketAddress))
+            throw new IllegalArgumentException("Unexpected address type: "
+                                               + address.getClass());
+
+        ServerSocketChannel ssc = ServerSocketChannel.open();
+        ssc.configureBlocking(false);
+        ssc.socket().bind(address, backlog);
+        ssc.register(selector, SelectionKey.OP_ACCEPT, defaultHandler);
+
+        channels.put(address, ssc);
+
+        if (worker == null) {
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    public synchronized void unbind(SocketAddress address) {
+        Validate.notNull(address);
+
+        ServerSocketChannel ssc = (ServerSocketChannel) channels.get(address);
+
+        if (ssc == null)
+            throw new IllegalArgumentException("Unknown address: " + address);
+
+        SelectionKey key = ssc.keyFor(selector);
+        key.cancel();
+        channels.remove(address);
+
+        try {
+            ssc.close();
+        } catch (IOException e) {
+            log.error("Unexpected exception", e);
+        }
+    }
+
+    private class Worker extends Thread {
+        public Worker() {
+            super("TcpAcceptor-" + id);
+        }
+
+        public void run() {
+            for (;;) {
+                try {
+                    int nKeys = selector.select();
+
+                    if (nKeys == 0)
+                        continue;
+                    
+                    Iterator it = selector.selectedKeys().iterator();
+                    while (it.hasNext()) {
+                        SelectionKey key = (SelectionKey) it.next();
+                        it.remove();
+                        
+                        if (!key.isAcceptable())
+                            continue;
+                        
+                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
+                        SocketChannel ch = ssc.accept();
+                        if (ch == null)
+                            continue;
+                        
+                        TcpSession session = new TcpSession(ch, (SessionHandler) key.attachment());
+                        session.start();
+                    }
+                } catch (IOException e) {
+                    log.error("Unexpected exception.", e);
+                }
+            }
+        }
+    }
+}

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=auto&rev=109794
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Sat Dec  4 01:56:28 2004
@@ -0,0 +1,98 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.lang.Validate;
+import org.apache.netty.common.SessionConfig;
+import org.apache.netty.downstream.Session;
+import org.apache.netty.downstream.SessionHandler;
+
+/**
+ * TODO Insert type comment.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+class TcpSession implements Session {
+
+    private final SocketChannel ch;
+    private final SessionConfig config = new SimpleSessionConfig();
+    private SessionHandler handler;
+    
+    /**
+     * Creates a new instance.
+     * 
+     * 
+     */
+    TcpSession(SocketChannel ch, SessionHandler defaultHandler) {
+        this.ch = ch;
+        this.handler = defaultHandler;
+    }
+
+    public SessionHandler getHandler() {
+        return handler;
+    }
+
+    public void setHandler(SessionHandler handler) {
+        Validate.notNull(handler);
+        this.handler = handler;
+    }
+
+    public void close() {
+    }
+
+    public ByteBuffer getWriteBuffer() {
+        return null;
+    }
+
+    public void setMark(Object mark) {
+    }
+
+    public void flush() {
+    }
+
+    public boolean isConnected() {
+        return ch.isConnected();
+    }
+
+    public boolean isClosed() {
+        return !ch.isConnected();
+    }
+
+    public SessionConfig getConfig() {
+        return config;
+    }
+
+    public SocketAddress getRemoteAddress() {
+        return ch.socket().getRemoteSocketAddress();
+    }
+
+    public SocketAddress getLocalAddress() {
+        return ch.socket().getLocalSocketAddress();
+    }
+
+    public long getReadBytes() {
+        return readBytes;
+    }
+
+    public long getWrittenBytes() {
+        return writtenBytes;
+    }
+
+    public long getLastIoTime() {
+        return Math.max(lastReadtime, lastWriteTime);
+    }
+
+    public long getLastReadTime() {
+        return lastReadTime;
+    }
+
+    public long getLastWriteTime() {
+        return lastWriteTime;
+    }
+}

Mime
View raw message