directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trus...@apache.org
Subject svn commit: rev 56129 - in incubator/directory/seda/trunk: . src/java/org/apache/seda src/java/org/apache/seda/decoder src/java/org/apache/seda/encoder src/java/org/apache/seda/output src/java/org/apache/seda/protocol src/java/org/apache/seda/stage src/java/org/apache/seda/thread
Date Sun, 31 Oct 2004 07:50:59 GMT
Author: trustin
Date: Sun Oct 31 00:50:58 2004
New Revision: 56129

Added:
   incubator/directory/seda/trunk/src/java/org/apache/seda/thread/AbstractThreadPool.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/thread/OrderedThreadPool.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/thread/RunnableQueue.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/thread/SimpleThreadPool.java
Modified:
   incubator/directory/seda/trunk/project.xml
   incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java
Log:
* Removed dependency to commons-threadpool; now uses home-grown thread pools for some reason.

* Modified ThreadPool.execute() signature.
InputEvent and OutputEvent must be passed to EncoderManager and OutputManager in order.  I
added a 'hint' parameter which is usually 'ClientKey' which will help OrderedThreadPool which
is newly added.



Modified: incubator/directory/seda/trunk/project.xml
==============================================================================
--- incubator/directory/seda/trunk/project.xml	(original)
+++ incubator/directory/seda/trunk/project.xml	Sun Oct 31 00:50:58 2004
@@ -117,13 +117,7 @@
     <!-- Dependencies required for running test cases and examples -->
     <!-- ========================================================= -->
 
-    <dependency>
-      <groupId>commons-threadpool</groupId>
-      <artifactId>commons-threadpool</artifactId>
-      <version>1.0-dev</version>
-      <url>http://jakarta.apache.org/commons/sandbox/threadpool</url>
-    </dependency>
-
+    
     <dependency>
       <groupId>commons-net</groupId>
       <artifactId>commons-net</artifactId>

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/DefaultFrontendFactory.java	Sun
Oct 31 00:50:58 2004
@@ -19,8 +19,6 @@
 
 import java.io.IOException;
 
-import org.apache.commons.threadpool.CommonsLoggingThreadPoolMonitor;
-import org.apache.commons.threadpool.DefaultThreadPool;
 import org.apache.seda.buffer.BufferPool;
 import org.apache.seda.buffer.DefaultBufferPool;
 import org.apache.seda.buffer.DefaultBufferPoolConfig;
@@ -40,8 +38,15 @@
 import org.apache.seda.listener.TCPListenerManager;
 import org.apache.seda.output.OutputManager;
 import org.apache.seda.output.TCPOutputManager;
-import org.apache.seda.protocol.*;
+import org.apache.seda.protocol.DefaultInetServicesDatabase;
+import org.apache.seda.protocol.DefaultRequestProcessor;
+import org.apache.seda.protocol.InetServiceEntry;
+import org.apache.seda.protocol.InetServicesDatabase;
+import org.apache.seda.protocol.RequestProcessor;
 import org.apache.seda.stage.DefaultStageConfig;
+import org.apache.seda.thread.AbstractThreadPool;
+import org.apache.seda.thread.OrderedThreadPool;
+import org.apache.seda.thread.SimpleThreadPool;
 import org.apache.seda.thread.ThreadPool;
 
 
@@ -110,19 +115,16 @@
     }
 
     // no deps
-    private ThreadPool createThreadPool(int threads)
+    private ThreadPool createThreadPool(int threads, boolean ordered)
     {
-        CommonsLoggingThreadPoolMonitor monitor =
-            new CommonsLoggingThreadPoolMonitor();
-        final DefaultThreadPool ctp = new DefaultThreadPool(monitor, threads);
-        ThreadPool pool =
-            new ThreadPool()
-            {
-                public void execute(Runnable runnable)
-                {
-                    ctp.invokeLater(runnable);
-                }
-            };
+        AbstractThreadPool pool;
+        if (ordered) {
+        	pool = new OrderedThreadPool();
+        } else {
+        	pool = new SimpleThreadPool();
+        }
+
+        pool.setThreadPoolSize(threads);
 
         return pool;
     }
@@ -133,7 +135,7 @@
                                                 InetServicesDatabase inetDb)
     {
         DefaultStageConfig config =
-            new DefaultStageConfig("decoderManager", createThreadPool(3));
+            new DefaultStageConfig("decoderManager", createThreadPool(3, true));
         DefaultDecoderManager decMan =
             new DefaultDecoderManager(router, config, inetDb);
         DecodeStageHandler handler = new DecodeStageHandler(decMan);
@@ -148,7 +150,7 @@
                                                 InetServicesDatabase inetDb)
     {
         DefaultStageConfig config =
-            new DefaultStageConfig("encoderManager", createThreadPool(3));
+            new DefaultStageConfig("encoderManager", createThreadPool(3, false));
         DefaultEncoderManager encMan =
             new DefaultEncoderManager(router, config, inetDb);
         EncodeStageHandler handler = new EncodeStageHandler(encMan);
@@ -176,7 +178,7 @@
     private OutputManager createOutputManager(EventRouter router)
     {
         DefaultStageConfig config =
-            new DefaultStageConfig("outputManager", createThreadPool(3));
+            new DefaultStageConfig("outputManager", createThreadPool(3, true));
         TCPOutputManager outMan = new TCPOutputManager(router, config);
         outMan.start();
         return outMan;
@@ -187,7 +189,7 @@
                                                     InetServicesDatabase inetDb)
     {
         DefaultStageConfig config =
-            new DefaultStageConfig("requestProcessor", createThreadPool(3));
+            new DefaultStageConfig("requestProcessor", createThreadPool(3, false));
         DefaultRequestProcessor reqProc =
             new DefaultRequestProcessor(router, config, inetDb);
         reqProc.start();

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
Sun Oct 31 00:50:58 2004
@@ -18,7 +18,6 @@
 package org.apache.seda.decoder;
 
 import java.nio.ByteBuffer;
-
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,7 +26,18 @@
 import org.apache.commons.codec.stateful.DecoderCallback;
 import org.apache.commons.codec.stateful.DecoderFactory;
 import org.apache.commons.codec.stateful.StatefulDecoder;
-import org.apache.seda.event.*;
+import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.AddProtocolEvent;
+import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.ConnectSubscriber;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.InputEvent;
+import org.apache.seda.event.InputSubscriber;
+import org.apache.seda.event.ProtocolEvent;
+import org.apache.seda.event.ProtocolSubscriber;
+import org.apache.seda.event.RequestEvent;
 import org.apache.seda.listener.ClientKey;
 import org.apache.seda.listener.KeyExpiryException;
 import org.apache.seda.protocol.InetServicesDatabase;
@@ -76,7 +86,7 @@
         this.router = router;
         this.inetdb = inetdb;
         this.monitor = new DecoderManagerMonitorAdapter();
-        super.setMonitor(new LoggingStageMonitor(getClass()));
+        super.setStageMonitor(new LoggingStageMonitor(getClass()));
 
         router.subscribe(InputEvent.class, this);
         router.subscribe(ConnectEvent.class, this);

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
Sun Oct 31 00:50:58 2004
@@ -65,12 +65,11 @@
      *
      * @param router the event router used to publish and subscribe to events on
      */
-    public DefaultEncoderManager(
-                                 EventRouter router, StageConfig config,
+    public DefaultEncoderManager(EventRouter router, StageConfig config,
                                  InetServicesDatabase inetdb)
     {
         super(config);
-        super.setMonitor(new LoggingStageMonitor(this.getClass()));
+        super.setStageMonitor(new LoggingStageMonitor(this.getClass()));
         monitor = new EncoderManagerMonitorAdapter();
         this.inetdb = inetdb;
         this.router = router;
@@ -118,8 +117,7 @@
     {
         if (event instanceof AddProtocolEvent)
         {
-            factories.put(
-                          event.getProtocolProvider().getName(),
+            factories.put(event.getProtocolProvider().getName(),
                           event.getProtocolProvider().getEncoderFactory());
         }
     }
@@ -135,7 +133,12 @@
                                    throws KeyExpiryException
     {
         String proto = inetdb.getProtoByPort(key.getLocalAddress().getPort());
-        EncoderFactory factory = (EncoderFactory) factories.get(proto);
+        EncoderFactory factory;
+
+        // FIXME Event synchronization
+        while ((factory = (EncoderFactory) factories.get(proto)) == null)
+            continue;
+
         return factory.createEncoder();
     }
 
@@ -166,8 +169,7 @@
          */
         encoder.setCallback(new EncoderCallback()
             {
-                public void encodeOccurred(
-                                           StatefulEncoder encoder,
+                public void encodeOccurred(StatefulEncoder encoder,
                                            Object encoded)
                 {
                     ClientKey key = ((ClientEncoder) encoder).getClientKey();
@@ -202,8 +204,7 @@
         final Object[] encoded = new Object[1];
         encoder.setCallback(new EncoderCallback()
             {
-                public void encodeOccurred(
-                                           StatefulEncoder encoder, Object obj)
+                public void encodeOccurred(StatefulEncoder encoder, Object obj)
                 {
                     encoded[0] = obj;
                 }
@@ -214,8 +215,8 @@
         // the encoded value should be set
         if (encoded[0] == null)
         {
-            throw new EncoderException("Expected a complete encoded object"
-                                       + " but encoder did not produce one");
+            throw new EncoderException("Expected a complete encoded object" +
+                                       " but encoder did not produce one");
         }
 
         return (ByteBuffer) encoded[0];

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/EncodeStageHandler.java
Sun Oct 31 00:50:58 2004
@@ -50,7 +50,11 @@
     {
         ResponseEvent re = (ResponseEvent) event;
         ClientKey key = re.getClientKey();
-        StatefulEncoder encoder = encMan.getEncoder(key);
+        StatefulEncoder encoder;
+        
+        // FIXME Event synchronization issues
+        while ((encoder = encMan.getEncoder(key)) == null)
+        	continue;
 
         try
         {

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/TCPOutputManager.java	Sun
Oct 31 00:50:58 2004
@@ -18,10 +18,8 @@
 package org.apache.seda.output;
 
 import java.io.IOException;
-
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.Map;
@@ -81,7 +79,7 @@
         this.router.subscribe(ConnectEvent.class, this);
         this.router.subscribe(DisconnectEvent.class, this);
         config.setHandler(new OutputStageHandler());
-        this.setMonitor(new LoggingStageMonitor());
+        this.setStageMonitor(new LoggingStageMonitor());
         this.setOutputMonitor(new LoggingOutputMonitor());
     }
 
@@ -213,8 +211,7 @@
                 }
                 catch (IOException e)
                 {
-                    monitor.failedOnWrite(
-                                          TCPOutputManager.this,
+                    monitor.failedOnWrite(TCPOutputManager.this,
                                           event.getClientKey(), e);
                 }
             }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/UDPOutputManager.java	Sun
Oct 31 00:50:58 2004
@@ -81,7 +81,7 @@
         this.router.subscribe(ConnectEvent.class, this);
         this.router.subscribe(DisconnectEvent.class, this);
         config.setHandler(new OutputStageHandler());
-        this.setMonitor(new LoggingStageMonitor());
+        this.setStageMonitor(new LoggingStageMonitor());
         this.setOutputMonitor(new LoggingOutputMonitor());
     }
 

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
Sun Oct 31 00:50:58 2004
@@ -56,7 +56,7 @@
 
         DefaultStageConfig defaultConfig = (DefaultStageConfig) config;
         defaultConfig.setHandler(new ProcessorStageHandler());
-        super.setMonitor(new LoggingStageMonitor(getClass()));
+        super.setStageMonitor(new LoggingStageMonitor(getClass()));
 
         this.inetDb = inetDb;
         this.router = router;

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/stage/DefaultStage.java	Sun Oct
31 00:50:58 2004
@@ -22,6 +22,9 @@
 import java.util.LinkedList;
 import java.util.Set;
 
+import org.apache.seda.event.ClientEvent;
+import org.apache.seda.listener.ClientKey;
+
 
 /**
  * The default Stage implementation.
@@ -204,7 +207,7 @@
      *
      * @param monitor the monitor to set for this Stage
      */
-    public void setMonitor(StageMonitor monitor)
+    public void setStageMonitor(StageMonitor monitor)
     {
         this.monitor = monitor;
     }
@@ -218,7 +221,7 @@
      *
      * @author <a href="mailto:aok123@bellsouth.net">Alex Karasulu</a>
      * @author $Author$
-     * @version $Revision$
+     * @version $Revision: 56106 $
      */
     class StageDriver implements Runnable
     {
@@ -253,7 +256,7 @@
                             }
                             catch (Exception e2)
                             {
-                                 /*NOT THROWN*/
+                                /*NOT THROWN*/
                             }
 
                             monitor.driverFailed(DefaultStage.this, e);
@@ -264,8 +267,12 @@
                         EventObject event = (EventObject) queue.removeLast();
                         monitor.eventDequeued(DefaultStage.this, event);
 
+                        ClientKey key = null;
+                        if (event instanceof ClientEvent) {
+                        	key = ((ClientEvent) event).getClientKey();
+                        }
                         Runnable l_runnable = new ExecutableHandler(event);
-                        config.getThreadPool().execute(l_runnable);
+                        config.getThreadPool().execute(l_runnable, key);
                         monitor.eventHandled(DefaultStage.this, event);
                     }
                 }
@@ -278,7 +285,7 @@
      *
      * @author <a href="mailto:aok123@bellsouth.net">Alex Karasulu</a>
      * @author $Author$
-     * @version $Revision$
+     * @version $Revision: 56106 $
      */
     class ExecutableHandler implements Runnable
     {

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/thread/AbstractThreadPool.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/thread/AbstractThreadPool.java
Sun Oct 31 00:50:58 2004
@@ -0,0 +1,214 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+
+/*
+ * @(#) $Id: AbstractThreadPooledEventDispatcher.java 169 2004-10-21 07:22:31Z trustin $
+ */
+package org.apache.seda.thread;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * A base class for thread-pooled stages.
+ *
+ * @author Trustin Lee (http://gleamynode.net/)
+ * @version $Rev: 169 $, $Date: 2004-10-21 16:22:31 +0900 (Thu, 21 Oct 2004) $
+ */
+public abstract class AbstractThreadPool implements ThreadPool
+{
+    private static final Runnable FEWER_THREADS =
+        new Runnable()
+        {
+            public void run()
+            {
+            }
+        };
+
+    private String threadNamePrefix = "stage-";
+    private boolean started;
+    protected final List workers = new ArrayList();
+
+    /** <code>true</code> if the size of this pool can be changed even if it
is already started. */
+    protected final boolean canChangeSizeOnTheFly;
+    private int threadPoolSize = Runtime.getRuntime().availableProcessors();
+    private int threadPriority = Thread.NORM_PRIORITY;
+    private int threadId = 0;
+
+    protected AbstractThreadPool(boolean canChangeSizeOnTheFly)
+    {
+        this.canChangeSizeOnTheFly = canChangeSizeOnTheFly;
+    }
+
+    public synchronized void start()
+    {
+        if (started)
+        {
+            return;
+        }
+
+        started = true;
+        forkThreads(threadPoolSize);
+    }
+
+    public synchronized void stop()
+    {
+        if (!started)
+        {
+            return;
+        }
+
+        forkThreads(-threadPoolSize);
+        started = false;
+    }
+
+    public boolean isStarted()
+    {
+        return started;
+    }
+
+    public int getThreadPoolSize()
+    {
+        return threadPoolSize;
+    }
+
+    public synchronized void setThreadPoolSize(int newSize)
+    {
+        if (newSize <= 0)
+            throw new IllegalArgumentException();
+
+        if (started)
+        {
+            if (canChangeSizeOnTheFly)
+                forkThreads(newSize - threadPoolSize);
+            else
+                throw new IllegalStateException();
+        }
+
+        threadPoolSize = newSize;
+    }
+
+    public abstract void execute(Runnable runnable, Object hint);
+
+    private void forkThreads(int delta)
+    {
+        if (delta == 0)
+        {
+            return;
+        }
+
+        if (delta > 0)
+        {
+            for (; delta > 0; delta--)
+            {
+                workers.add(newWorker());
+            }
+        }
+        else
+        {
+            for (; delta < 0; delta++)
+            {
+                AbstractWorker worker =
+                    (AbstractWorker) workers.remove(workers.size() - 1);
+                worker.localEventQueue.push(FEWER_THREADS);
+            }
+        }
+    }
+
+    protected abstract AbstractWorker newWorker();
+
+    public int getThreadPriority()
+    {
+        return threadPriority;
+    }
+
+    public synchronized void setThreadPriority(int newPriority)
+    {
+        if ((newPriority < Thread.MIN_PRIORITY) ||
+                (newPriority > Thread.MAX_PRIORITY))
+            throw new IllegalArgumentException();
+
+        this.threadPriority = newPriority;
+
+        if (isStarted())
+        {
+            Iterator it = workers.iterator();
+
+            while (it.hasNext())
+            {
+                AbstractWorker worker = (AbstractWorker) it.next();
+                worker.setPriority(newPriority);
+            }
+        }
+    }
+
+    public String getThreadNamePrefix()
+    {
+        return threadNamePrefix;
+    }
+
+    public void setThreadNamePrefix(String threadNamePrefix)
+    {
+        if (threadNamePrefix == null)
+            throw new NullPointerException();
+
+        this.threadNamePrefix = threadNamePrefix;
+    }
+
+    protected abstract class AbstractWorker extends Thread
+    {
+        protected final RunnableQueue localEventQueue;
+
+        protected AbstractWorker(RunnableQueue eventQueue)
+        {
+            super(getThreadNamePrefix() + '-' + threadId++);
+
+            setPriority(getThreadPriority());
+            setDaemon(true);
+            this.localEventQueue = eventQueue;
+
+            super.start();
+        }
+
+        public final void run()
+        {
+            Runnable runnable;
+            Object item;
+
+            while (isStarted())
+            {
+                runnable = localEventQueue.fetch();
+
+                if (runnable == FEWER_THREADS)
+                {
+                    break;
+                }
+
+                try
+                {
+                    runnable.run();
+                }
+                catch (Throwable t)
+                {
+                    t.printStackTrace();
+                }
+            }
+        }
+    }
+}

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/thread/OrderedThreadPool.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/thread/OrderedThreadPool.java
Sun Oct 31 00:50:58 2004
@@ -0,0 +1,74 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+
+package org.apache.seda.thread;
+
+import org.apache.seda.listener.ClientKey;
+
+
+/**
+ * @author Trustin Lee
+ */
+public class OrderedThreadPool extends AbstractThreadPool
+{
+    private int nextWorkerIdx;
+
+    public OrderedThreadPool()
+    {
+        super(false);
+    }
+
+    protected AbstractWorker newWorker()
+    {
+        return new Worker();
+    }
+
+    public void execute(Runnable runnable, Object hint)
+    {
+        start();
+
+        if (hint instanceof ClientKey)
+        {
+            getWorker((ClientKey) hint).localEventQueue.push(runnable);
+        }
+        else
+        {
+            nextWorker().localEventQueue.push(runnable);
+        }
+    }
+
+    private synchronized Worker nextWorker()
+    {
+        int workerIdx = nextWorkerIdx++;
+        nextWorkerIdx %= workers.size();
+        return (Worker) workers.get(workerIdx);
+    }
+
+    private Worker getWorker(ClientKey key)
+    {
+        return (Worker) workers.get(Math.abs(System.identityHashCode(this) ^
+                                             key.hashCode()) % this.getThreadPoolSize());
+    }
+
+    private class Worker extends AbstractWorker
+    {
+        public Worker()
+        {
+            super(new RunnableQueue(16));
+        }
+    }
+}

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/thread/RunnableQueue.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/thread/RunnableQueue.java	Sun
Oct 31 00:50:58 2004
@@ -0,0 +1,142 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id: EventQueue.java 169 2004-10-21 07:22:31Z trustin $
+ */
+package org.apache.seda.thread;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * A thread-safe event queue.
+ *
+ * @author Trustin Lee (http://gleamynode.net/dev/)
+ * @version $Rev: 169 $, $Date: 2004-10-21 16:22:31 +0900 (Thu, 21 Oct 2004) $
+ */
+class RunnableQueue implements Serializable {
+    private Runnable[] events;
+    private int first = 0;
+    private int last = 0;
+    private int size = 0;
+    private int waitingForNewItem;
+
+    /**
+     * Construct a new, empty <code>Queue</code> with the specified initial
+     * capacity.
+     */
+    public RunnableQueue(int initialCapacity) {
+        events = new Runnable[initialCapacity];
+    }
+
+    /**
+     * Clears this queue.
+     */
+    public synchronized void clear() {
+        Arrays.fill(events, null);
+        first = 0;
+        last = 0;
+        size = 0;
+    }
+
+    /**
+     * Fetches an event entry from this queue.
+     */
+    public synchronized Runnable fetch() {
+        Runnable e;
+        waitingForNewItem++;
+
+        while ((e = fetchNow0()) == null) {
+            try {
+                wait();
+            } catch (InterruptedException ex) {
+            }
+        }
+
+        waitingForNewItem--;
+        return e;
+    }
+
+    public synchronized Runnable fetchNow() {
+        return fetchNow0();
+    }
+
+    private Runnable fetchNow0() {
+        if (size == 0) {
+            return null;
+        }
+
+        Runnable event = events[first];
+        events[first++] = null;
+
+        if (first == events.length) {
+            first = 0;
+        }
+
+        size--;
+        return event;
+    }
+
+    /**
+     * Enqueue into this queue.
+     */
+    public synchronized void push(Runnable event) {
+        if (size == events.length) {
+            // expand queue
+            final int oldLen = events.length;
+            Runnable[] newEvents = new Runnable[oldLen * 2];
+
+            if (first < last) {
+                System.arraycopy(events, first, newEvents, 0, last - first);
+            } else {
+                System.arraycopy(events, first, newEvents, 0, oldLen - first);
+                System.arraycopy(events, 0, newEvents, oldLen - first, last);
+            }
+
+            first = 0;
+            last = oldLen;
+            events = newEvents;
+        }
+
+        events[last++] = event;
+
+        if (last == events.length) {
+            last = 0;
+        }
+
+        size++;
+
+        if (waitingForNewItem > 0) {
+            notify();
+        }
+    }
+
+    /**
+     * Returns <code>true</code> if the queue is empty.
+     */
+    public boolean isEmpty() {
+        return (size == 0);
+    }
+
+    /**
+     * Returns the number of elements in the queue.
+     */
+    public int size() {
+        return size;
+    }
+}

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/thread/SimpleThreadPool.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/thread/SimpleThreadPool.java	Sun
Oct 31 00:50:58 2004
@@ -0,0 +1,37 @@
+/*
+ * Created on 2004. 10. 31
+ *
+ * TODO To change the template for this generated file go to
+ * Window - Preferences - Java - Code Style - Code Templates
+ */
+package org.apache.seda.thread;
+
+
+/**
+ * @author Trustin Lee
+ *
+ * TODO To change the template for this generated type comment go to
+ * Window - Preferences - Java - Code Style - Code Templates
+ */
+public class SimpleThreadPool extends AbstractThreadPool {
+    private final RunnableQueue globalEventQueue = new RunnableQueue(16);
+
+	public SimpleThreadPool() {
+		super(true);
+	}
+
+    protected AbstractWorker newWorker() {
+        return new Worker();
+    }
+
+    public void execute(Runnable event, Object hint) {
+    	start();
+        globalEventQueue.push(event);
+    }
+
+    private class Worker extends AbstractWorker {
+        public Worker() {
+            super(globalEventQueue);
+        }
+    }
+}

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/thread/ThreadPool.java	Sun Oct
31 00:50:58 2004
@@ -32,5 +32,5 @@
      *
      * @param runnable the runnable to execute
      */
-    void execute(Runnable runnable);
+    void execute(Runnable runnable, Object hint);
 }

Mime
View raw message