incubator-droids-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thors...@apache.org
Subject svn commit: r772441 - in /incubator/droids/trunk/droids-core/src/main/java/org/apache/droids: api/PausableTaskMaster.java api/TaskMaster.java impl/MultiThreadedTaskMaster.java
Date Wed, 06 May 2009 22:01:27 GMT
Author: thorsten
Date: Wed May  6 22:01:26 2009
New Revision: 772441

URL: http://svn.apache.org/viewvc?rev=772441&view=rev
Log:
DROIDS-46
MultiThreadedTaskMaster (WorkRunner) memory leak
DUE-TO: Mingfai Ma

Apply patch that had been put together by Ryan. 

Thanks Ryan and Mingfai.

Modified:
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java?rev=772441&r1=772440&r2=772441&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
(original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
Wed May  6 22:01:26 2009
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.droids.api;
-
-
-/**
- * A pausable task master interface
- */
-public interface PausableTaskMaster<T extends Task> extends TaskMaster<T> {
-  void pause();
-  void resume();
-}

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java?rev=772441&r1=772440&r2=772441&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
(original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
Wed May  6 22:01:26 2009
@@ -28,7 +28,6 @@
   public enum ExecutionState {
     INITALIZED,
     RUNNING,
-    PAUSED,
     COMPLETE
   };
  

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=772441&r1=772440&r2=772441&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
(original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
Wed May  6 22:01:26 2009
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,131 +16,227 @@
  */
 package org.apache.droids.impl;
 
+import java.io.IOException;
 import java.util.Date;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.droids.api.DelayTimer;
 import org.apache.droids.api.Droid;
-import org.apache.droids.api.PausableTaskMaster;
 import org.apache.droids.api.Task;
 import org.apache.droids.api.TaskExceptionHandler;
 import org.apache.droids.api.TaskExceptionResult;
+import org.apache.droids.api.TaskMaster;
 import org.apache.droids.api.TaskQueue;
 import org.apache.droids.api.WorkMonitor;
 import org.apache.droids.api.Worker;
-import org.apache.droids.helper.Loggable;
+import org.apache.droids.exception.DroidsException;
 
-public class MultiThreadedTaskMaster<T extends Task> 
-     extends Loggable implements PausableTaskMaster<T>
-{
+/**
+ * This task master provides a base implementation that support multithreaded task processing
powered by a
+ * ThreadPoolExecutor.
+ * <p/>
+ * By default, the ThreadPoolExecutor uses a bounded blocking queue with a size as same as
the maxThreads. As long as
+ * there is any outstanding task in the Task Queue and the Executor has spare capacity, a
new Thread will be created to
+ * poll and handle one task from the Task Queue. User may set to use another 'pool' implementation,
and the nextTask()
+ * method may be overriden by sub-class.
+ * <p/>
+ * This Task Master doesn't support: pausing, monitoring etc. TaskExceptionHandler is unimplemented.
+ * <p/>
+ * And there is no failure handling mechanism, e.g. if the JVM of Task Master is crashed,
any task polled from the Task Queue
+ * will be loss. (this point makes sense only if the Task Queue is persistent)
+ *
+ * @param <T>
+ */
+public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T>
{
+  protected final Log log = LogFactory.getLog(MultiThreadedTaskMaster.class);
   private static final long KEEP_ALIVE = 50000L;
-  
-  private PausableThreadPoolExecutor pool = null;
-  private int maxThreads = 0;
-  private TaskQueue<T> queue = null;
-  private Droid<T> droid = null;
-  private DelayTimer delayTimer = null;
-  private WorkMonitor<T> monitor = null;
-
-  private Date startedWorking = null;
-  private Date finishedWorking = null;
-  private T lastCompletedTask = null;
-  private ExecutionState state = ExecutionState.INITALIZED;
+  private static final long TICKLE_TIME = 100L;
+
+  protected ThreadPoolExecutor pool = null;
+  protected int maxThreads = 0;
+  protected TaskQueue<T> queue = null;
+  protected Droid<T> droid = null;
+  protected DelayTimer delayTimer = null;
+  protected WorkMonitor<T> monitor = null;
+
+  protected Date startedWorking = null;
+  protected Date finishedWorking = null;
+  protected T lastCompletedTask = null;
+  protected volatile ExecutionState state = ExecutionState.INITALIZED;
+  protected AtomicLong completedCount = new AtomicLong();
+  protected TaskExceptionHandler exHandler;
 
-  private TaskExceptionHandler exHandler;
-  
   /**
    * The queue has been initialized
    */
-  public void processAllTasks(final TaskQueue<T> queue, final Droid<T> droid)

-  {
+  public void processAllTasks(final TaskQueue<T> queue, final Droid<T> droid)
{
+    if( state == ExecutionState.RUNNING ) {
+      // TODO? throw an error?
+      log.info("already processing tasks" );
+      return;
+    }
+    
     this.queue = queue;
     this.droid = droid;
     this.startedWorking = new Date();
     this.finishedWorking = null;
     this.state = ExecutionState.RUNNING;
 
-    if( !queue.hasNext() ) {
-      log.info( "no tasks. finishing now" );
+    if (!queue.hasNext()) {
+      log.info("no tasks. finishing now");
       terminate();
       return;
     }
     
-    int n = getMaxThreads();
+    // start the pool
+    int poolSize = getMaxThreads();
+    // user may set a thread pool before calling the processAllTasks
+    this.pool = this.pool != null ? this.pool : new ThreadPoolExecutor(poolSize, poolSize,
KEEP_ALIVE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(poolSize));
+    this.pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//task
is not yet run, there is no impact to discard fail to submit task
     if (log.isInfoEnabled()) {
-      log.info("Number of concurrent workers: " + n);
+      log.info("processAllTasks() - created ThreadPoolExecutor - poolSize: " + poolSize);
     }
-    // start the pool
-    this.pool = new PausableThreadPoolExecutor(n, n, KEEP_ALIVE,
-        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
     
-    startWorkers();
-    log.info("Finshed invocation, waiting for workers to finish.");
-  }
-  
-  
-  private void finishedRunner(WorkerRunner runner) 
-  { 
-    synchronized (this) {
-      lastCompletedTask = runner.task;
-      pool.remove(runner);
-      if (log.isDebugEnabled()) {
-        log.debug("Worker '" + runner.task.getId() + "' has finished.");
-      }
-
-      boolean terminate = false;
-      
-      Exception ex = runner.exception;
-      if (ex != null) {
-        TaskExceptionResult result = TaskExceptionResult.WARN;  
-        if (exHandler != null) {
-          result = exHandler.handleException(ex); 
-        }
-        switch (result) {
-        case WARN:
-          log.warn(ex.toString(), ex);
-          break;
-        case FATAL:
-          log.warn(ex.getMessage(), ex);
-          terminate = true;
-          break;
+    // process tasks in a new thread
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    executor.execute( new Runnable() {
+      @Override
+      public void run() {
+        while (queue.hasNext() || pool.getActiveCount() > 0) {
+          if (log.isTraceEnabled()) {
+            log.trace("processAllTasks() - pool.activeCount: " + pool.getActiveCount() +
", pool.maximumPoolSize(): " + pool.getMaximumPoolSize());
+          }
+          
+          if (queue.hasNext() && pool.getActiveCount() < pool.getMaximumPoolSize())
{
+            nextTask(pool, queue);
+          } 
+          else {
+            try {
+              if (log.isDebugEnabled()) 
+                log.debug("processAllTasks() - no task or thread pool is full, to check again
in " + TICKLE_TIME + "ms - queue.size: " + queue.getSize());
+              Thread.sleep(TICKLE_TIME);
+            } 
+            catch (InterruptedException e) {
+              log.error(e);
+            }
+          }
         }
+        state = ExecutionState.COMPLETE;
+        log.info("Finshed invocation, waiting for workers to finish.");
       }
-      
-      if (terminate) {
-        terminate();
-      } 
-      else {
-        int cnt = startWorkers();
+    });
+  }
+
+  /**
+   * This method is designed for sub-classing
+   * <p/>
+   * TODO: refactor and create a specialized thread that
+   * - construct with the delayTimer and transparently apply the delay
+   * - construct with the Task and transparent set the lastCompleteTask
+   * <p/>
+   * TODO: consider to provide all variables as argument so that the method overrider needs
not to call the parent
+   * variables
+   * <p/>
+   * TODO: caller of this method could use the Future to track the task completion and perform
the termination and
+   * other checking.
+   *
+   * @param executor
+   * @param queue
+   * @return
+   */
+  protected Future nextTask(ExecutorService executor, final TaskQueue<? extends T>
queue) {
+    return executor.submit(new Runnable() {
+      public void run() {
+        String threadName = Thread.currentThread().getName();
+        T task = queue.next();
+        long delay = (delayTimer != null) ? delayTimer.getDelayMillis() : 0;
+        if (log.isDebugEnabled())
+          log.debug("run() - begin - thread: " + threadName + ", task: " + task.getId() +
", delay: " + delay);
         
-        // shutdown the queue...
-        if( cnt == 0 && !queue.hasNext() ) {
-          if( pool.getActiveCount() > 1 ) {
-            // keep running
+        Worker<T> worker = null;
+        Exception ex = null;
+        try {
+          if( delay > 0 ) {
+            Thread.sleep(delay); // gets the current thread
           }
-          else {
+          worker = droid.getNewWorker();
+          if( monitor != null ) {
+            monitor.beforeExecute( task, worker );
+          }
+          worker.execute(task);
+          lastCompletedTask = task;
+        } catch (DroidsException e) {
+          ex = e;
+        } catch (IOException e) {
+          ex = e;
+        } catch (InterruptedException e) {
+          ex = e;
+        } 
+        finally {
+          // Handle any exceptions
+          boolean terminate = false;
+          if (ex != null) {
+            try {
+              TaskExceptionResult result = TaskExceptionResult.WARN;  
+              if (exHandler != null) {
+                result = exHandler.handleException(ex); 
+              }
+              switch (result) {
+              case WARN:
+                log.warn(ex.toString(), ex);
+                break;
+              case FATAL:
+                log.warn(ex.getMessage(), ex);
+                terminate = true;
+                break;
+              case IGNORE: break; // nothing
+              }
+            }
+            catch( Exception e2 ) {
+              log.error( e2.getMessage(), e2 );
+            }
+          }
+
+          if( monitor != null ) {
+            monitor.afterExecute( task, worker, ex );
+          }
+          
+          completedCount.incrementAndGet();
+          if (log.isInfoEnabled()) {
+            log.info("run() - done - completedCount: " + completedCount + ", thread: " +
threadName +
+                ", task: " + task.getId() + ", queue.size: " + queue.getSize() +
+                ", pool.activeCount: " + pool.getActiveCount());
+          }
+
+          if( terminate ) {
+            terminate();
+          }
+          else if (!queue.hasNext() && pool.getActiveCount() == 1) { //TODO it isn't
a very good idea to check the activeCount inside the thread. an alternative way is to use
the future to track the thread status and do the termination
+            log.info("run() - no more queued task and active threads, set to terminate");
             terminate();
           }
         }
       }
-    }
+    });
   }
-  
-  private void terminate() 
-  {
-    if( pool != null ) {
+
+
+  private void terminate() {
+    if (pool != null) {
       shutdownAndAwaitTermination();
     }
-    
+
     long elapsed = System.currentTimeMillis() - startedWorking.getTime();
     if (log.isInfoEnabled()) {
-      log.info("All threads have finished. (elapsed: " + elapsed + ")" );
+      log.info("All threads have finished. (elapsed: " + elapsed + ")");
     }
     finishedWorking = new Date();
     state = ExecutionState.COMPLETE;
@@ -148,86 +244,32 @@
   }
 
   /**
-   * Will start a new worker.
-   * @return the id of the worker we just started.
+   * Set the maximum allowed thread count. If the new value is less 
+   * than the current value, excess existing threads will be terminated 
+   * when they become idle.
+   * 
+   * @param count
    */
-  private int startWorkers(){
-    
-    int cnt = 0;
-    try {
-      while( queue.hasNext() && cnt++ < maxThreads ) {
-//        // checking the "activeCount" can be expensive...
-//        // The getActiveCount is often wrong for only a single thread!
-//        // the javadocs say the number is approxiate, so in the case we only 
-//        // see one active worker, we will assume there are none...
-//        int activeWorkers = pool.getActiveCount();
-//        if( activeWorkers == 1 ) { //&& runningWorker.isEmpty() ) {
-//          activeWorkers = 0;
-//        }
-//        if( activeWorkers >= getMaxThreads() ) {
-//          return;  // don't make a new runner...
-//        }
-        pool.execute( new WorkerRunner() );
-      }
-    }
-    catch( RejectedExecutionException ex ) {
-      log.trace( "rejected", ex );
+  public void setMaxThreads( int count ) {
+    if( pool != null ) {
+      pool.setMaximumPoolSize( count );
+      
+      // NOTE, the main loop should fill up any spaces
+      // within TICKLE_TIME
     }
-    
-//    if( cnt == 0 && !queue.hasNext() && pool.getActiveCount() == 0 ) {
-//      terminate(); // nothing to do, we must be done...
-//    }
-    return cnt;
-  }
-
-
-  public final void setExceptionHandler(TaskExceptionHandler exHandler) {
-    this.exHandler = exHandler;
-  }
-  
-
-  public DelayTimer getDelayTimer() {
-    return delayTimer;
-  }
-
-
-  public void setDelayTimer(DelayTimer delayTimer) {
-    this.delayTimer = delayTimer;
-  }
-  
-  public WorkMonitor<T> getMonitor() {
-    return monitor;
-  }
-
-  public void setMonitor(WorkMonitor<T> monitor) {
-    this.monitor = monitor;
+    maxThreads = count;
   }
   
   /**
-   * Adjust number of allowed threads
-   * @param maxThreads
-   */
-  public  void setMaxThreads(int maxThreads) {
-    if( pool != null && maxThreads != this.maxThreads ) {
-      pool.setMaximumPoolSize( maxThreads );
-      pool.setCorePoolSize( maxThreads );
-      if( state == ExecutionState.RUNNING ) {
-        startWorkers(); // fill up the queue with new workers...
-      }
-    }
-    this.maxThreads = maxThreads;
-  }
-
-  /**
    * Get number of maximum allowed threads
+   *
    * @return the number of maximum threads that we allow
    */
   public int getMaxThreads() {
     return maxThreads;
   }
 
-  public ExecutionState getExecutionState()
-  {
+  public ExecutionState getExecutionState() {
     return state;
   }
 
@@ -236,9 +278,9 @@
    * If it is not working by asking nice to shutdown just kill all
    * threads.
    */
-  protected void shutdownAndAwaitTermination() {
+  public void shutdownAndAwaitTermination() {
     log.info("SHUTTING DOWN");
-    if( pool == null ) {
+    if (pool == null) {
       return;
     }
     pool.shutdown(); // Disable new tasks from being submitted
@@ -252,7 +294,7 @@
           log.info("Pool did not terminate");
         }
       }
-    } 
+    }
     catch (InterruptedException ie) {
       // (Re-)Cancel if current thread also interrupted
       pool.shutdownNow();
@@ -260,55 +302,27 @@
       Thread.currentThread().interrupt();
     }
   }
-  
-  /**
-   * Wrapper to call finishedWorker() after 
-   */
-  class WorkerRunner implements Runnable {
-    T task;
-    Exception exception;
-    
-    @Override
-    public void run() {
-      try {
-        task = queue.next();
-        if( task != null ) {
-          if( delayTimer != null ) {
-            long delay = delayTimer.getDelayMillis();
-            if( delay > 0 ) {
-              try {
-                Thread.sleep( delay );
-              } 
-              catch (InterruptedException e) {} // we don't really care....
-            }
-          }
 
-          Worker<T> worker = droid.getNewWorker();
-          try {
-            if( monitor != null ) {
-              monitor.beforeExecute(task, worker);
-            }
-            worker.execute( task );
-            if( monitor != null ) {
-              monitor.afterExecute(task, worker, null);
-            }
-          }
-          catch (Exception ex) {
-            exception = ex;
-            if( monitor != null ) {
-              monitor.afterExecute(task, worker, ex);
-            }
-          }
-        }
-      }
-      finally {
-        finishedRunner( this );
-      }
-    }
+
+  public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    log.info("awaitTermination()");
+    pool.awaitTermination(timeout, unit);
+  }
+
+  public final void setExceptionHandler(TaskExceptionHandler exHandler) {
+    this.exHandler = exHandler;
+  }
+
+  public DelayTimer getDelayTimer() {
+    return delayTimer;
+  }
+
+  public void setDelayTimer(DelayTimer delayTimer) {
+    this.delayTimer = delayTimer;
   }
 
   public long getCompletedTasks() {
-    return (pool==null) ? 0 : pool.getCompletedTaskCount();
+    return completedCount.get();
   }
 
   public Date getFinishedWorking() {
@@ -323,65 +337,12 @@
     return startedWorking;
   }
   
-  public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-    pool.awaitTermination(timeout, unit);
-  }
-  
-  
-  /**
-   * "pause" support
-   */
-  private class PausableThreadPoolExecutor extends ThreadPoolExecutor {
-    ReentrantLock pauseLock = new ReentrantLock();
-    Condition unpaused = pauseLock.newCondition();
-
-    public PausableThreadPoolExecutor(
-          int corePoolSize,
-          int maximumPoolSize,
-          long keepAliveTime,
-          TimeUnit unit,
-          BlockingQueue<Runnable> workQueue) {
-      super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
-    }
-
-    @Override
-    protected void beforeExecute(Thread t, Runnable r) {
-      super.beforeExecute(t, r);
-      pauseLock.lock();
-      try {
-        while (state == ExecutionState.PAUSED) unpaused.await();
-      } catch(InterruptedException ie) {
-        t.interrupt();
-      } finally {
-        pauseLock.unlock();
-      }
-    }
+  public WorkMonitor<T> getMonitor() {
+    return monitor;
   }
 
-  public void pause() {
-    if( pool == null ) {
-      return;
-    }
-    pool.pauseLock.lock();
-    try {
-      state = ExecutionState.PAUSED;
-    } finally {
-      pool.pauseLock.unlock();
-    }
+  public void setMonitor(WorkMonitor<T> monitor) {
+    this.monitor = monitor;
   }
 
-  public void resume() {
-    if( pool == null ) {
-      return;
-    }
-    pool.pauseLock.lock();
-    try {
-      state = ExecutionState.RUNNING;
-      pool.unpaused.signalAll();
-    } finally {
-      pool.pauseLock.unlock();
-    }
-  }
 }
-
-



Mime
View raw message