incubator-droids-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchap...@apache.org
Subject svn commit: r1068297 - in /incubator/droids/branch/bchapuis: ./ docs/ docs/api/ docs/develop/ docs/images/ docs/themes/ droids-core/src/main/java/org/apache/droids/impl/ droids-crawler-web/ droids-crawler/ tools/
Date Tue, 08 Feb 2011 09:10:23 GMT
Author: bchapuis
Date: Tue Feb  8 09:10:22 2011
New Revision: 1068297

URL: http://svn.apache.org/viewvc?rev=1068297&view=rev
Log:
Removed the projects which were not tied whith droids-core. Removed some ant and forest stuffs.

Added:
    incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
Removed:
    incubator/droids/branch/bchapuis/build-doc.sh
    incubator/droids/branch/bchapuis/build.xml
    incubator/droids/branch/bchapuis/default.properties
    incubator/droids/branch/bchapuis/docs/api/
    incubator/droids/branch/bchapuis/docs/broken-links.xml
    incubator/droids/branch/bchapuis/docs/changes.dispatcher.css
    incubator/droids/branch/bchapuis/docs/changes.html
    incubator/droids/branch/bchapuis/docs/changes.pdf
    incubator/droids/branch/bchapuis/docs/changes.rss
    incubator/droids/branch/bchapuis/docs/contrib.dispatcher.css
    incubator/droids/branch/bchapuis/docs/contrib.html
    incubator/droids/branch/bchapuis/docs/contrib.pdf
    incubator/droids/branch/bchapuis/docs/default.dispatcher.css
    incubator/droids/branch/bchapuis/docs/default.html
    incubator/droids/branch/bchapuis/docs/default.pdf
    incubator/droids/branch/bchapuis/docs/develop/
    incubator/droids/branch/bchapuis/docs/develope.dispatcher.css
    incubator/droids/branch/bchapuis/docs/develope.html
    incubator/droids/branch/bchapuis/docs/develope.pdf
    incubator/droids/branch/bchapuis/docs/favicon.ico
    incubator/droids/branch/bchapuis/docs/images/
    incubator/droids/branch/bchapuis/docs/index.dispatcher.css
    incubator/droids/branch/bchapuis/docs/index.html
    incubator/droids/branch/bchapuis/docs/index.pdf
    incubator/droids/branch/bchapuis/docs/install.dispatcher.css
    incubator/droids/branch/bchapuis/docs/install.html
    incubator/droids/branch/bchapuis/docs/install.pdf
    incubator/droids/branch/bchapuis/docs/linkmap.dispatcher.css
    incubator/droids/branch/bchapuis/docs/linkmap.html
    incubator/droids/branch/bchapuis/docs/linkmap.pdf
    incubator/droids/branch/bchapuis/docs/locationmap.xml
    incubator/droids/branch/bchapuis/docs/mail-lists.dispatcher.css
    incubator/droids/branch/bchapuis/docs/mail-lists.html
    incubator/droids/branch/bchapuis/docs/mail-lists.pdf
    incubator/droids/branch/bchapuis/docs/themes/
    incubator/droids/branch/bchapuis/docs/todo.dispatcher.css
    incubator/droids/branch/bchapuis/docs/todo.html
    incubator/droids/branch/bchapuis/docs/todo.pdf
    incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
    incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java
    incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/SimpleTaskQueue.java
    incubator/droids/branch/bchapuis/droids-crawler/
    incubator/droids/branch/bchapuis/droids-crawler-web/
    incubator/droids/branch/bchapuis/forrest.properties
    incubator/droids/branch/bchapuis/forrest.properties.xml
    incubator/droids/branch/bchapuis/ivy.xml
    incubator/droids/branch/bchapuis/tools/

Added: incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
URL: http://svn.apache.org/viewvc/incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java?rev=1068297&view=auto
==============================================================================
--- incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
(added)
+++ incubator/droids/branch/bchapuis/droids-core/src/main/java/org/apache/droids/impl/TaskMasterImpl.java
Tue Feb  8 09:10:22 2011
@@ -0,0 +1,380 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.droids.impl;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.droids.api.DelayTimer;
+import org.apache.droids.api.Droid;
+import org.apache.droids.api.Task;
+import org.apache.droids.api.TaskExceptionHandler;
+import org.apache.droids.api.TaskExceptionResult;
+import org.apache.droids.api.TaskMaster;
+import org.apache.droids.api.TaskMaster.ExecutionState;
+import org.apache.droids.api.WorkMonitor;
+import org.apache.droids.api.Worker;
+import org.apache.droids.helper.Loggable;
+
+/**
+ *
+ * @author bchapuis
+ */
+public class TaskMasterImpl<T extends Task> extends Loggable implements TaskMaster<T>
+{
+
+  private final long TICKLE_TIME = 250L;
+
+  /**
+   * The execution state
+   */
+  protected volatile ExecutionState state = ExecutionState.STOPPED;
+
+  /**
+   * The delay timer
+   */
+  protected DelayTimer delayTimer;
+
+  /**
+   * The start time
+   */
+  protected Date startTime;
+
+  /**
+   * The end time
+   */
+  protected Date endTime;
+
+  /**
+   * The last completed task
+   */
+  protected T lastCompletedTask;
+
+  /**
+   * The completed task counter
+   */
+  protected AtomicLong completedTasks = new AtomicLong();
+
+  /**
+   * The monitor that that records the processing of tasks
+   */
+  protected WorkMonitor<T> monitor;
+
+  /**
+   * The task exception handler
+   */
+  protected TaskExceptionHandler exceptionHandler;
+
+  /*
+   * The pool size
+   */
+  private int poolSize = 1;
+
+  /**
+   * The pool
+   */
+  private TaskExecutorPool pool ;
+
+  @Override
+  public void start(Queue<T> queue, Droid<T> droid)
+  {
+    if (log.isInfoEnabled()) {
+      log.info("Start the executor service.");
+    }
+
+    state = ExecutionState.RUNNING;
+
+    if (pool == null) {
+      this.pool = new TaskExecutorPool();
+    }
+
+    for (int i = 0; i < poolSize; i++) {
+      pool.execute(new TaskExecutor(droid));
+    }
+  }
+
+  /**
+   * @inheritDoc
+   */
+  @Override
+  public void stop()
+  {
+    // debug
+    if (log.isInfoEnabled()) {
+      log.info("Stop the executor service.");
+    }
+
+    state = ExecutionState.STOPPED;
+
+    // Disable new tasks from being submitted
+    pool.shutdown();
+
+    // Wait a while for existing tasks to terminate
+    try {
+      if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+
+        // Cancel currently executing tasks
+        pool.shutdownNow();
+
+        // Wait a while for to respond to being canceled
+        if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+          if (log.isInfoEnabled()) {
+            log.info("Scheduler did not stop.");
+          }
+        }
+      }
+    } catch (InterruptedException ex) {
+
+      if (log.isInfoEnabled()) {
+        log.info("Force scheduler to stop.");
+      }
+
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+
+    // debug
+    if (log.isInfoEnabled()) {
+      log.info("Scheduler stopped.");
+    }
+
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+  {
+    return pool.awaitTermination(timeout, unit);
+  }
+
+  /**
+   * @inheritDoc
+   */
+  @Override
+  public ExecutionState getExecutionState()
+  {
+    return state;
+  }
+
+  /**
+   * @return
+   * @inheritDoc
+   */
+  public WorkMonitor<T> getMonitor()
+  {
+    return monitor;
+  }
+
+  /**
+   * @param monitor
+   * @inheritDoc
+   */
+  public void setMonitor(WorkMonitor<T> monitor)
+  {
+    if (state == ExecutionState.RUNNING) {
+      throw new IllegalStateException("The TaskMaster must be stopped to set a Monitor.");
+    }
+    this.monitor = monitor;
+  }
+
+  @Override
+  public void setExceptionHandler(TaskExceptionHandler exceptionHandler)
+  {
+    this.exceptionHandler = exceptionHandler;
+  }
+
+  @Override
+  public void setDelayTimer(DelayTimer delayTimer)
+  {
+    this.delayTimer = delayTimer;
+  }
+
+  @Override
+  public Date getFinishedWorking()
+  {
+    return endTime;
+  }
+
+  @Override
+  public T getLastCompletedTask()
+  {
+    return lastCompletedTask;
+  }
+
+  @Override
+  public long getCompletedTasks()
+  {
+    return completedTasks.get();
+  }
+
+  @Override
+  public Date getStartTime()
+  {
+    return startTime;
+  }
+
+  /**
+   * Sets the pool size
+   *
+   * @return
+   */
+  public int getPoolSize()
+  {
+    return poolSize;
+  }
+
+  /**
+   * Returns the size of the pool
+   *
+   * @param poolSize
+   */
+  public void setPoolSize(int poolSize)
+  {
+    pool.setCorePoolSize(this.poolSize = poolSize);
+  }
+
+  private class TaskExecutorPool extends ThreadPoolExecutor
+  {
+
+    private static final long KEEP_ALIVE = 50000L;
+
+    public TaskExecutorPool()
+    {
+      super(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+      this.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable thrwbl)
+    {
+      super.afterExecute(r, thrwbl);
+
+      // try to reexecute the task runner while
+      // the task queue is not empty and while the pool
+      // is still completing the execution of tasks.
+      TaskExecutor taskExecutor = (TaskExecutor) r;
+      while (taskExecutor.getQueue().size() > 0 || getQueue().size() > 0) {
+        if (taskExecutor.getQueue().size() > 0) {
+          execute(r);
+          return;
+        }
+        try {
+          Thread.sleep(TICKLE_TIME);
+        } catch (InterruptedException e) {
+          log.error(e);
+        }
+      } 
+
+      taskExecutor.getDroid().finished();
+      state = ExecutionState.COMPLETED;
+      shutdownNow();
+
+    }
+  }
+
+  private class TaskExecutor implements Runnable
+  {
+
+    private final Droid<T> droid;
+    private final Queue<T> queue;
+    private final Worker<T> worker;
+
+    public TaskExecutor(Droid<T> droid)
+    {
+      this.droid = droid;
+      this.queue = droid.getQueue();
+      this.worker = droid.getNewWorker();
+    }
+
+    public Droid<T> getDroid()
+    {
+      return droid;
+    }
+
+    public Queue<T> getQueue()
+    {
+      return queue;
+    }
+
+    public Worker getWorker()
+    {
+      return worker;
+    }
+
+    @Override
+    public void run()
+    {
+      // poll the last task
+      T task = queue.poll();
+
+      // execute the task
+      if (task != null) {
+        try {
+          // monitor the execution of the task
+          if (monitor != null) {
+            monitor.beforeExecute(task, worker);
+          }
+
+          // debug
+          if (log.isDebugEnabled()) {
+            log.debug("Worker [" + worker + "] execute task [" + task + "].");
+          }
+
+          // execute the task
+          worker.execute(task);
+
+          // debug
+          if (log.isDebugEnabled()) {
+            log.debug("Worker [" + worker + "] executed task [" + task + "] with success.");
+          }
+
+          // monitor the execution of the task
+          if (monitor != null) {
+            monitor.afterExecute(task, worker, null);
+          }
+
+          // set the monitored variables
+          completedTasks.incrementAndGet();
+          lastCompletedTask = task;
+
+        } catch (Exception ex) {
+          // debug
+          if (log.isDebugEnabled()) {
+            log.debug("Worker [" + worker + "] executed task [" + task + "] without success.");
+          }
+
+          // debug
+          if (log.isErrorEnabled()) {
+            log.error(ex);
+          }
+
+          // monitor the exception
+          if (monitor != null) {
+            monitor.afterExecute(task, worker, ex);
+          }
+
+          // handler the exception
+          if (ex != null) {
+            TaskExceptionResult result = exceptionHandler.handleException(ex);
+
+            // stop the execution in case of a fatal exception
+            if (TaskExceptionResult.FATAL.equals(result)) {
+              state = ExecutionState.STOPPED;
+            }
+
+            droid.finished();
+            pool.shutdownNow();
+            
+          }
+        }
+      }
+    }
+  }
+}



Mime
View raw message