zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject zeppelin git commit: ZEPPELIN-3558. Refactor Scheduler component
Date Tue, 18 Sep 2018 06:51:50 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 76ac96142 -> bec799e42


ZEPPELIN-3558. Refactor Scheduler component

### What is this PR for?

This is a refactoring PR for refactoring scheduler component.  This PR introduce AbstractScheduler which have many common code, FIFOScheduler and ParallelSchduler become much clean after this PR

### What type of PR is it?
[ Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3558

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #3176 from zjffdu/ZEPPELIN-3558 and squashes the following commits:

2dca83bba [Jeff Zhang] ZEPPELIN-3558. Refactor Scheduler component


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/bec799e4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/bec799e4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/bec799e4

Branch: refs/heads/master
Commit: bec799e42a9c8ad14563e573068a6204abdb4a7c
Parents: 76ac961
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Apr 26 08:28:38 2018 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Tue Sep 18 14:51:30 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/groovy/GroovyInterpreter.java      |  10 +-
 .../remote/RemoteInterpreterServer.java         |  23 +-
 .../zeppelin/scheduler/AbstractScheduler.java   | 147 +++++++++++++
 .../zeppelin/scheduler/ExecutorFactory.java     |  45 ++--
 .../zeppelin/scheduler/FIFOScheduler.java       | 165 +-------------
 .../zeppelin/scheduler/ParallelScheduler.java   | 174 +--------------
 .../apache/zeppelin/scheduler/Scheduler.java    |  16 +-
 .../zeppelin/scheduler/SchedulerFactory.java    |  44 ++--
 .../zeppelin/scheduler/SchedulerListener.java   |  27 ---
 .../scheduler/SchedulerThreadFactory.java       |  39 ++++
 .../zeppelin/scheduler/FIFOSchedulerTest.java   |  42 +---
 .../scheduler/ParallelSchedulerTest.java        |  11 +-
 .../interpreter/ManagedInterpreterGroup.java    |   7 +-
 .../interpreter/remote/RemoteInterpreter.java   |   5 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |  10 -
 .../zeppelin/scheduler/RemoteScheduler.java     | 220 +++----------------
 .../zeppelin/scheduler/RemoteSchedulerTest.java |   4 -
 17 files changed, 306 insertions(+), 683 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java
----------------------------------------------------------------------
diff --git a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java
index 607a6d5..cf50c8f 100644
--- a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java
+++ b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -112,14 +111,7 @@ public class GroovyInterpreter extends Interpreter {
   }
 
   private Job getRunningJob(String paragraphId) {
-    Job foundJob = null;
-    Collection<Job> jobsRunning = getScheduler().getJobsRunning();
-    for (Job job : jobsRunning) {
-      if (job.getId().equals(paragraphId)) {
-        foundJob = job;
-      }
-    }
-    return foundJob;
+    return getScheduler().getJob(paragraphId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 11e6bdb..e588e40 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -643,9 +643,9 @@ public class RemoteInterpreterServer extends Thread
     logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
     Interpreter intp = getInterpreter(sessionId, className);
     String jobId = interpreterContext.getParagraphId();
-    Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
+    Job job = intp.getScheduler().getJob(jobId);
 
-    if (job != null) {
+    if (job != null && job.getStatus() == Status.PENDING) {
       job.setStatus(Status.ABORT);
     } else {
       try {
@@ -797,20 +797,13 @@ public class RemoteInterpreterServer extends Thread
         logger.info("getStatus:" + Status.UNKNOWN.name());
         return Status.UNKNOWN.name();
       }
-      //TODO(zjffdu) ineffient for loop interpreter and its jobs
-      for (Interpreter intp : interpreters) {
-        for (Job job : intp.getScheduler().getJobsRunning()) {
-          if (jobId.equals(job.getId())) {
-            logger.info("getStatus:" + job.getStatus().name());
-            return job.getStatus().name();
-          }
-        }
 
-        for (Job job : intp.getScheduler().getJobsWaiting()) {
-          if (jobId.equals(job.getId())) {
-            logger.info("getStatus:" + job.getStatus().name());
-            return job.getStatus().name();
-          }
+      for (Interpreter intp : interpreters) {
+        Job job = intp.getScheduler().getJob(jobId);
+        logger.info("job:" + job);
+        if (job != null) {
+          logger.info("getStatus: " + job.getStatus().name());
+          return job.getStatus().name();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
new file mode 100644
index 0000000..c264b9b
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Abstract class for scheduler implementation. Implementor just need to implement method
+ * runJobInScheduler.
+ */
+public abstract class AbstractScheduler implements Scheduler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduler.class);
+
+  protected String name;
+  protected volatile boolean terminate = false;
+  protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>();
+  protected Map<String, Job> jobs = new ConcurrentHashMap<>();
+
+
+  public AbstractScheduler(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public List<Job> getAllJobs() {
+    return new ArrayList<>(jobs.values());
+  }
+
+  @Override
+  public Job getJob(String jobId) {
+    return jobs.get(jobId);
+  }
+
+  @Override
+  public void submit(Job job) {
+    job.setStatus(Job.Status.PENDING);
+    queue.add(job);
+    jobs.put(job.getId(), job);
+  }
+
+  @Override
+  public Job cancel(String jobId) {
+    Job job = jobs.remove(jobId);
+    job.abort();
+    return job;
+  }
+
+  @Override
+  public void run() {
+    while (!terminate) {
+      Job runningJob = null;
+      try {
+        runningJob = queue.take();
+      } catch (InterruptedException e) {
+        LOGGER.warn("{} is interrupted", getClass().getSimpleName(), e);
+        break;
+      }
+
+      runJobInScheduler(runningJob);
+    }
+  }
+
+  public abstract void runJobInScheduler(Job job);
+
+  @Override
+  public void stop() {
+    terminate = true;
+    for (Job job : queue) {
+      job.aborted = true;
+      job.jobAbort();
+    }
+  }
+
+  /**
+   * This is the logic of running job.
+   * Subclass can use this method and can customize where and when to run this method.
+   *
+   * @param runningJob
+   */
+  protected void runJob(Job runningJob) {
+    if (runningJob.isAborted()) {
+      runningJob.setStatus(Job.Status.ABORT);
+      runningJob.aborted = false;
+      return;
+    }
+
+    LOGGER.info("Job " + runningJob.getId() + " started by scheduler " + name);
+    // Don't set RUNNING status when it is RemoteScheduler, update it via JobStatusPoller
+    if (!getClass().getSimpleName().equals("RemoteScheduler")) {
+      runningJob.setStatus(Job.Status.RUNNING);
+    }
+    runningJob.run();
+    Object jobResult = runningJob.getReturn();
+    if (runningJob.isAborted()) {
+      runningJob.setStatus(Job.Status.ABORT);
+      LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " +
+          runningJob.getErrorMessage());
+    } else if (runningJob.getException() != null) {
+      LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
+          runningJob.getReturn());
+      runningJob.setStatus(Job.Status.ERROR);
+    } else if (jobResult != null && jobResult instanceof InterpreterResult
+        && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR) {
+      LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
+          runningJob.getReturn());
+      runningJob.setStatus(Job.Status.ERROR);
+    } else {
+      LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " +
+          runningJob.getReturn());
+      runningJob.setStatus(Job.Status.FINISHED);
+    }
+
+    LOGGER.info("Job " + runningJob.getId() + " finished by scheduler " + name);
+    // reset aborted flag to allow retry
+    runningJob.aborted = false;
+    jobs.remove(runningJob.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
index c09af6d..02b7e72 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
@@ -22,60 +22,53 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
- *
+ * Factory class for Executor
  */
 public class ExecutorFactory {
-  private static ExecutorFactory _executor;
+  private static ExecutorFactory instance;
   private static Long _executorLock = new Long(0);
 
-  Map<String, ExecutorService> executor = new HashMap<>();
+  private Map<String, ExecutorService> executors = new HashMap<>();
 
-  public ExecutorFactory() {
+  private ExecutorFactory() {
 
   }
 
   public static ExecutorFactory singleton() {
-    if (_executor == null) {
+    if (instance == null) {
       synchronized (_executorLock) {
-        if (_executor == null) {
-          _executor = new ExecutorFactory();
+        if (instance == null) {
+          instance = new ExecutorFactory();
         }
       }
     }
-    return _executor;
-  }
-
-  public ExecutorService getDefaultExecutor() {
-    return createOrGet("default");
-  }
-
-  public ExecutorService createOrGet(String name) {
-    return createOrGet(name, 100);
+    return instance;
   }
 
   public ExecutorService createOrGet(String name, int numThread) {
-    synchronized (executor) {
-      if (!executor.containsKey(name)) {
-        executor.put(name, Executors.newScheduledThreadPool(numThread));
+    synchronized (executors) {
+      if (!executors.containsKey(name)) {
+        executors.put(name, Executors.newScheduledThreadPool(numThread,
+            new SchedulerThreadFactory(name)));
       }
-      return executor.get(name);
+      return executors.get(name);
     }
   }
 
   public void shutdown(String name) {
-    synchronized (executor) {
-      if (executor.containsKey(name)) {
-        ExecutorService e = executor.get(name);
+    synchronized (executors) {
+      if (executors.containsKey(name)) {
+        ExecutorService e = executors.get(name);
         e.shutdown();
-        executor.remove(name);
+        executors.remove(name);
       }
     }
   }
 
 
   public void shutdownAll() {
-    synchronized (executor) {
-      for (String name : executor.keySet()){
+    synchronized (executors) {
+      for (String name : executors.keySet()) {
         shutdown(name);
       }
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index fd467b6..30e0763 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -17,168 +17,25 @@
 
 package org.apache.zeppelin.scheduler;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 /**
  * FIFOScheduler runs submitted job sequentially
  */
-public class FIFOScheduler implements Scheduler {
-  List<Job> queue = new LinkedList<>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  Job runningJob = null;
-  private String name;
-
-  static Logger LOGGER = LoggerFactory.getLogger(FIFOScheduler.class);
-
-  public FIFOScheduler(String name, ExecutorService executor, SchedulerListener listener) {
-    this.name = name;
-    this.executor = executor;
-    this.listener = listener;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<>();
-    Job job = runningJob;
-
-    if (job != null) {
-      ret.add(job);
-    }
-
-    return ret;
-  }
+public class FIFOScheduler extends AbstractScheduler {
 
+  private Executor executor;
 
-
-  @Override
-  public void submit(Job job) {
-    job.setStatus(Status.PENDING);
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
+  FIFOScheduler(String name) {
+    super(name);
+    executor = Executors.newSingleThreadExecutor(
+        new SchedulerThreadFactory("FIFOScheduler-Worker-"));
   }
 
-
   @Override
-  public Job removeFromWaitingQueue(String jobId) {
-    synchronized (queue) {
-      Iterator<Job> it = queue.iterator();
-      while (it.hasNext()) {
-        Job job = it.next();
-        if (job.getId().equals(jobId)) {
-          it.remove();
-          return job;
-        }
-      }
-    }
-    return null;
+  public void runJobInScheduler(final Job job) {
+    // run job in the SingleThreadExecutor since this is FIFO.
+    executor.execute(() -> runJob(job));
   }
-
-  @Override
-  public void run() {
-
-    synchronized (queue) {
-      while (terminate == false) {
-        synchronized (queue) {
-          if (runningJob != null || queue.isEmpty() == true) {
-            try {
-              queue.wait(500);
-            } catch (InterruptedException e) {
-              LOGGER.error("Exception in FIFOScheduler while run queue.wait", e);
-            }
-            continue;
-          }
-
-          runningJob = queue.remove(0);
-        }
-
-        final Scheduler scheduler = this;
-        this.executor.execute(new Runnable() {
-          @Override
-          public void run() {
-            if (runningJob.isAborted()) {
-              runningJob.setStatus(Status.ABORT);
-              runningJob.aborted = false;
-              synchronized (queue) {
-                queue.notify();
-              }
-              return;
-            }
-
-            runningJob.setStatus(Status.RUNNING);
-            if (listener != null) {
-              listener.jobStarted(scheduler, runningJob);
-            }
-            runningJob.run();
-            Object jobResult = runningJob.getReturn();
-            if (runningJob.isAborted()) {
-              runningJob.setStatus(Status.ABORT);
-              LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " +
-                  runningJob.getErrorMessage());
-            } else if (runningJob.getException() != null) {
-              LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
-                  runningJob.getReturn());
-              runningJob.setStatus(Status.ERROR);
-            } else if (jobResult != null && jobResult instanceof InterpreterResult
-                && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR) {
-              LOGGER.debug("Job Error, " + runningJob.getId() + ", " +
-                  runningJob.getReturn());
-              runningJob.setStatus(Status.ERROR);
-            } else {
-              LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " +
-                  runningJob.getReturn());
-              runningJob.setStatus(Status.FINISHED);
-            }
-
-            if (listener != null) {
-              listener.jobFinished(scheduler, runningJob);
-            }
-            // reset aborted flag to allow retry
-            runningJob.aborted = false;
-            runningJob = null;
-            synchronized (queue) {
-              queue.notify();
-            }
-          }
-        });
-      }
-    }
-  }
-
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index 6f67cd7..1c12c03 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -17,179 +17,25 @@
 
 package org.apache.zeppelin.scheduler;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.Executors;
 
 /**
  * Parallel scheduler runs submitted job concurrently.
  */
-public class ParallelScheduler implements Scheduler {
-  List<Job> queue = new LinkedList<>();
-  List<Job> running = new LinkedList<>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  private String name;
-  private int maxConcurrency;
-
-  static Logger LOGGER = LoggerFactory.getLogger(ParallelScheduler.class);
-
-  public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener,
-      int maxConcurrency) {
-    this.name = name;
-    this.executor = executor;
-    this.listener = listener;
-    this.maxConcurrency = maxConcurrency;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Job removeFromWaitingQueue(String jobId) {
-    synchronized (queue) {
-      Iterator<Job> it = queue.iterator();
-      while (it.hasNext()) {
-        Job job = it.next();
-        if (job.getId().equals(jobId)) {
-          it.remove();
-          return job;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : running) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-
+public class ParallelScheduler extends AbstractScheduler {
 
-  @Override
-  public void submit(Job job) {
-    job.setStatus(Status.PENDING);
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-
-  @Override
-  public void run() {
-    while (terminate == false) {
-      Job job = null;
-      synchronized (queue) {
-        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            LOGGER.error("Exception in MockInterpreterAngular while interpret queue.wait", e);
-          }
-          continue;
-        }
-
-        job = queue.remove(0);
-        running.add(job);
-      }
-      Scheduler scheduler = this;
-
-      executor.execute(new JobRunner(scheduler, job));
-    }
-  }
-
-  public void setMaxConcurrency(int maxConcurrency) {
-    this.maxConcurrency = maxConcurrency;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
-  private class JobRunner implements Runnable {
-    private Scheduler scheduler;
-    private Job job;
-
-    JobRunner(Scheduler scheduler, Job job) {
-      this.scheduler = scheduler;
-      this.job = job;
-    }
-
-    @Override
-    public void run() {
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-        job.aborted = false;
-
-        synchronized (queue) {
-          running.remove(job);
-          queue.notify();
-        }
-
-        return;
-      }
-
-      job.setStatus(Status.RUNNING);
-      if (listener != null) {
-        listener.jobStarted(scheduler, job);
-      }
-      job.run();
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-      } else {
-        if (job.getException() != null) {
-          job.setStatus(Status.ERROR);
-        } else {
-          job.setStatus(Status.FINISHED);
-        }
-      }
-
-      if (listener != null) {
-        listener.jobFinished(scheduler, job);
-      }
+  private ExecutorService executor;
 
-      // reset aborted flag to allow retry
-      job.aborted = false;
-      synchronized (queue) {
-        running.remove(job);
-        queue.notify();
-      }
-    }
+  ParallelScheduler(String name, int maxConcurrency) {
+    super(name);
+    this.executor = Executors.newFixedThreadPool(maxConcurrency,
+        new SchedulerThreadFactory("ParallelScheduler-Worker-"));
   }
 
-
   @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
+  public void runJobInScheduler(final Job runningJob) {
+    // submit this job to a FixedThreadPool so that at most maxConcurrencyJobs running
+    executor.execute(() -> runJob(runningJob));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
index 3055727..d2b68b3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -17,21 +17,27 @@
 
 package org.apache.zeppelin.scheduler;
 
-import java.util.Collection;
+import java.util.List;
 
 /**
- * Interface for scheduler
+ * Interface for scheduler. Scheduler is used for manage the lifecycle of job.
+ * Including query, submit and cancel job.
+ *
+ * Scheduler can run both in Zeppelin Server and Interpreter Process. e.g. RemoveScheduler run
+ * in Zeppelin Server side while FIFOScheduler run in Interpreter Process.
  */
 public interface Scheduler extends Runnable {
+
   String getName();
 
-  Collection<Job> getJobsWaiting();
+  List<Job> getAllJobs();
 
-  Collection<Job> getJobsRunning();
+  Job getJob(String jobId);
 
   void submit(Job job);
 
-  Job removeFromWaitingQueue(String jobId);
+  Job cancel(String jobId);
 
   void stop();
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index b629ef7..a4d16ee 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -17,21 +17,25 @@
 
 package org.apache.zeppelin.scheduler;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
 /**
- * Factory class for creating schedulers
+ * Factory class for creating schedulers except RemoteScheduler as RemoteScheudler runs in
+ * zeppelin server process instead of interpreter process.
  *
  */
-public class SchedulerFactory implements SchedulerListener {
-  private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
+public class SchedulerFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerFactory.class);
+  private static final String SCHEDULER_EXECUTOR_NAME = "SchedulerFactory";
+
   protected ExecutorService executor;
-  protected Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+  protected Map<String, Scheduler> schedulers = new HashMap<>();
 
   private static SchedulerFactory singleton;
   private static Long singletonLock = new Long(0);
@@ -43,7 +47,7 @@ public class SchedulerFactory implements SchedulerListener {
           try {
             singleton = new SchedulerFactory();
           } catch (Exception e) {
-            logger.error(e.toString(), e);
+            LOGGER.error(e.toString(), e);
           }
         }
       }
@@ -51,8 +55,10 @@ public class SchedulerFactory implements SchedulerListener {
     return singleton;
   }
 
-  SchedulerFactory() throws Exception {
-    executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100);
+  SchedulerFactory() {
+    ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
+    executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME,
+        zConf.getInt("zeppelin.scheduler.threadpool.size", 100));
   }
 
   public void destroy() {
@@ -62,7 +68,7 @@ public class SchedulerFactory implements SchedulerListener {
   public Scheduler createOrGetFIFOScheduler(String name) {
     synchronized (schedulers) {
       if (!schedulers.containsKey(name)) {
-        Scheduler s = new FIFOScheduler(name, executor, this);
+        FIFOScheduler s = new FIFOScheduler(name);
         schedulers.put(name, s);
         executor.execute(s);
       }
@@ -73,7 +79,7 @@ public class SchedulerFactory implements SchedulerListener {
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
     synchronized (schedulers) {
       if (!schedulers.containsKey(name)) {
-        Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
+        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
         schedulers.put(name, s);
         executor.execute(s);
       }
@@ -81,6 +87,7 @@ public class SchedulerFactory implements SchedulerListener {
     }
   }
 
+  
   public Scheduler createOrGetScheduler(Scheduler scheduler) {
     synchronized (schedulers) {
       if (!schedulers.containsKey(scheduler.getName())) {
@@ -104,15 +111,4 @@ public class SchedulerFactory implements SchedulerListener {
     return executor;
   }
 
-  @Override
-  public void jobStarted(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName());
-
-  }
-
-  @Override
-  public void jobFinished(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName());
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
deleted file mode 100644
index 9a6b3ed..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-/**
- * TODO(moon) : add description.
- */
-public interface SchedulerListener {
-  void jobStarted(Scheduler scheduler, Job job);
-
-  void jobFinished(Scheduler scheduler, Job job);
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
new file mode 100644
index 0000000..fe0711e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.scheduler;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SchedulerThreadFactory implements ThreadFactory {
+
+  private String namePrefix;
+  private AtomicLong count = new AtomicLong(1);
+
+  public SchedulerThreadFactory(String namePrefix) {
+    this.namePrefix = namePrefix;
+  }
+
+  @Override
+  public Thread newThread(Runnable r) {
+    Thread thread = new Thread(r);
+    thread.setName(namePrefix + count.getAndIncrement());
+    return thread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
index 807b5ee..e2d91ad 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.scheduler;
 
 import junit.framework.TestCase;
 import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.Test;
 
 public class FIFOSchedulerTest extends TestCase {
 
@@ -31,13 +32,12 @@ public class FIFOSchedulerTest extends TestCase {
 
   @Override
   public void tearDown() {
-
+    schedulerSvc.destroy();
   }
 
+  @Test
   public void testRun() throws InterruptedException {
     Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
-    assertEquals(0, s.getJobsRunning().size());
-    assertEquals(0, s.getJobsWaiting().size());
 
     Job job1 = new SleepingJob("job1", null, 500);
     Job job2 = new SleepingJob("job2", null, 500);
@@ -48,48 +48,17 @@ public class FIFOSchedulerTest extends TestCase {
 
     assertEquals(Status.RUNNING, job1.getStatus());
     assertEquals(Status.PENDING, job2.getStatus());
-    assertEquals(1, s.getJobsRunning().size());
-    assertEquals(1, s.getJobsWaiting().size());
-
 
     Thread.sleep(500);
     assertEquals(Status.FINISHED, job1.getStatus());
     assertEquals(Status.RUNNING, job2.getStatus());
     assertTrue((500 < (Long) job1.getReturn()));
-    assertEquals(1, s.getJobsRunning().size());
-    assertEquals(0, s.getJobsWaiting().size());
-
+    s.stop();
   }
 
+  @Test
   public void testAbort() throws InterruptedException {
     Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
-    assertEquals(0, s.getJobsRunning().size());
-    assertEquals(0, s.getJobsWaiting().size());
-
-    Job job1 = new SleepingJob("job1", null, 500);
-    Job job2 = new SleepingJob("job2", null, 500);
-
-    s.submit(job1);
-    s.submit(job2);
-
-    Thread.sleep(200);
-
-    job1.abort();
-    job2.abort();
-
-    Thread.sleep(200);
-
-    assertEquals(Status.ABORT, job1.getStatus());
-    assertEquals(Status.ABORT, job2.getStatus());
-
-    assertTrue((500 > (Long) job1.getReturn()));
-    assertEquals(null, job2.getReturn());
-  }
-
-  public void testRemoveFromWaitingQueue() throws InterruptedException {
-    Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
-    assertEquals(0, s.getJobsRunning().size());
-    assertEquals(0, s.getJobsWaiting().size());
 
     Job job1 = new SleepingJob("job1", null, 500);
     Job job2 = new SleepingJob("job2", null, 500);
@@ -109,5 +78,6 @@ public class FIFOSchedulerTest extends TestCase {
 
     assertTrue((500 > (Long) job1.getReturn()));
     assertEquals(null, job2.getReturn());
+    s.stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
index 2495142..9b9b9ba 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.scheduler;
 
 import junit.framework.TestCase;
 import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.Test;
 
 public class ParallelSchedulerTest extends TestCase {
 
@@ -32,13 +33,12 @@ public class ParallelSchedulerTest extends TestCase {
 
   @Override
   public void tearDown() {
-
+    schedulerSvc.destroy();
   }
 
+  @Test
   public void testRun() throws InterruptedException {
     Scheduler s = schedulerSvc.createOrGetParallelScheduler("test", 2);
-    assertEquals(0, s.getJobsRunning().size());
-    assertEquals(0, s.getJobsWaiting().size());
 
     Job job1 = new SleepingJob("job1", null, 500);
     Job job2 = new SleepingJob("job2", null, 500);
@@ -52,17 +52,12 @@ public class ParallelSchedulerTest extends TestCase {
     assertEquals(Status.RUNNING, job1.getStatus());
     assertEquals(Status.RUNNING, job2.getStatus());
     assertEquals(Status.PENDING, job3.getStatus());
-    assertEquals(2, s.getJobsRunning().size());
-    assertEquals(1, s.getJobsWaiting().size());
 
     Thread.sleep(500);
 
     assertEquals(Status.FINISHED, job1.getStatus());
     assertEquals(Status.FINISHED, job2.getStatus());
     assertEquals(Status.RUNNING, job3.getStatus());
-    assertEquals(1, s.getJobsRunning().size());
-    assertEquals(0, s.getJobsWaiting().size());
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index e1470df..faa77f1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -121,12 +121,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
 
     for (Interpreter interpreter : interpreters) {
       Scheduler scheduler = interpreter.getScheduler();
-      for (Job job : scheduler.getJobsRunning()) {
-        job.abort();
-        job.setStatus(Job.Status.ABORT);
-        LOGGER.info("Job " + job.getJobName() + " aborted ");
-      }
-      for (Job job : scheduler.getJobsWaiting()) {
+      for (Job job : scheduler.getAllJobs()) {
         job.abort();
         job.setStatus(Job.Status.ABORT);
         LOGGER.info("Job " + job.getJobName() + " aborted ");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 6f9f81f..36f6021 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -389,10 +389,7 @@ public class RemoteInterpreter extends Interpreter {
         RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-"
             + sessionId,
         SchedulerFactory.singleton().getExecutor(),
-        sessionId,
-        this,
-        SchedulerFactory.singleton(),
-        maxConcurrency);
+        this);
     return SchedulerFactory.singleton().createOrGetScheduler(s);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 0397075..288bb71 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -475,16 +475,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
       return true;
     }
 
-    Job job = scheduler.removeFromWaitingQueue(getId());
-    if (job != null) {
-      job.setStatus(Status.ABORT);
-    } else {
-      try {
-        interpreter.cancel(getInterpreterContext(null));
-      } catch (InterpreterException e) {
-        throw new RuntimeException(e);
-      }
-    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 202ceb2..4c9de8a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -17,153 +17,45 @@
 
 package org.apache.zeppelin.scheduler;
 
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 /**
- * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
- *
+ * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter.
+ * It is some kind of FIFOScheduler, but only run the next job after the current job is submitted
+ * to remote.
  */
-public class RemoteScheduler implements Scheduler {
-  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
+public class RemoteScheduler extends AbstractScheduler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteScheduler.class);
 
-  List<Job> queue = new LinkedList<>();
-  List<Job> running = new LinkedList<>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  private String name;
-  private int maxConcurrency;
-  private final String sessionId;
   private RemoteInterpreter remoteInterpreter;
+  private ExecutorService executor;
 
-  public RemoteScheduler(String name, ExecutorService executor, String sessionId,
-                         RemoteInterpreter remoteInterpreter, SchedulerListener listener,
-                         int maxConcurrency) {
-    this.name = name;
+  public RemoteScheduler(String name,
+                         ExecutorService executor,
+                         RemoteInterpreter remoteInterpreter) {
+    super(name);
     this.executor = executor;
-    this.listener = listener;
-    this.sessionId = sessionId;
     this.remoteInterpreter = remoteInterpreter;
-    this.maxConcurrency = maxConcurrency;
-  }
-
-  @Override
-  public void run() {
-    while (terminate == false) {
-      Job job = null;
-
-      synchronized (queue) {
-        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while run queue.wait", e);
-          }
-          continue;
-        }
-
-        job = queue.remove(0);
-        running.add(job);
-      }
-
-      // run
-      Scheduler scheduler = this;
-      JobRunner jobRunner = new JobRunner(scheduler, job);
-      executor.execute(jobRunner);
-
-      // wait until it is submitted to the remote
-      while (!jobRunner.isJobSubmittedInRemote()) {
-        synchronized (queue) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
-                "queue.wait", e);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Job removeFromWaitingQueue(String jobId) {
-    synchronized (queue) {
-      Iterator<Job> it = queue.iterator();
-      while (it.hasNext()) {
-        Job job = it.next();
-        if (job.getId().equals(jobId)) {
-          it.remove();
-          return job;
-        }
-      }
-    }
-    return null;
   }
 
   @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : running) {
-        if (job.getStatus() == Status.RUNNING) {
-          ret.add(job);
-        } else {
-          logger.error(
-                  "Tried to add {} to list of running jobs, but job status is {}",
-                  job.getJobName(),
-                  job.getStatus().toString()
-          );
-        }
+  public void runJobInScheduler(Job job) {
+    JobRunner jobRunner = new JobRunner(this, job);
+    executor.execute(jobRunner);
+    // wait until it is submitted to the remote
+    while (!jobRunner.isJobSubmittedInRemote()) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        LOGGER.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
+            "queue.wait", e);
       }
     }
-    return ret;
-  }
-
-  @Override
-  public void submit(Job job) {
-    if (terminate) {
-      throw new RuntimeException("Scheduler already terminated");
-    }
-    job.setStatus(Status.PENDING);
-
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-
-  public void setMaxConcurrency(int maxConcurrency) {
-    this.maxConcurrency = maxConcurrency;
-    synchronized (queue) {
-      queue.notify();
-    }
   }
 
   /**
@@ -180,8 +72,8 @@ public class RemoteScheduler implements Scheduler {
     volatile Status lastStatus;
 
     public JobStatusPoller(long initialPeriodMsec,
-        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
-        JobListener listener) {
+                           long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
+                           JobListener listener) {
       setName("JobStatusPoller-" + job.getId());
       this.initialPeriodMsec = initialPeriodMsec;
       this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
@@ -207,7 +99,7 @@ public class RemoteScheduler implements Scheduler {
           try {
             this.wait(interval);
           } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while run this.wait", e);
+            LOGGER.error("Exception in RemoteScheduler while run this.wait", e);
           }
         }
 
@@ -255,9 +147,11 @@ public class RemoteScheduler implements Scheduler {
 
     public synchronized Status getStatus() {
       if (!remoteInterpreter.isOpened()) {
+        LOGGER.info("status:" + getLastStatus());
         return getLastStatus();
       }
       Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId()));
+      LOGGER.info("getStatus:" + status.name());
       if (status == Status.UNKNOWN) {
         // not found this job in the remote schedulers.
         // maybe not submitted, maybe already finished
@@ -265,19 +159,19 @@ public class RemoteScheduler implements Scheduler {
       }
       lastStatus = status;
       listener.onStatusChange(job, null, status);
+      LOGGER.info("status:" + getLastStatus());
       return status;
     }
   }
 
-  //TODO(zjffdu) need to refactor the schdule module which is too complicated
   private class JobRunner implements Runnable, JobListener {
     private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
-    private Scheduler scheduler;
+    private RemoteScheduler scheduler;
     private Job job;
     private volatile boolean jobExecuted;
     volatile boolean jobSubmittedRemotely;
 
-    public JobRunner(Scheduler scheduler, Job job) {
+    public JobRunner(RemoteScheduler scheduler, Job job) {
       this.scheduler = scheduler;
       this.job = job;
       jobExecuted = false;
@@ -290,65 +184,18 @@ public class RemoteScheduler implements Scheduler {
 
     @Override
     public void run() {
-      if (job.isAborted()) {
-        synchronized (queue) {
-          job.setStatus(Status.ABORT);
-          job.aborted = false;
-
-          running.remove(job);
-          queue.notify();
-        }
-        jobSubmittedRemotely = true;
-
-        return;
-      }
-
       JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
           job, this);
       jobStatusPoller.start();
-
-      if (listener != null) {
-        listener.jobStarted(scheduler, job);
-      }
-      job.run();
-
+      scheduler.runJob(job);
       jobExecuted = true;
       jobSubmittedRemotely = true;
-
       jobStatusPoller.shutdown();
       try {
         jobStatusPoller.join();
       } catch (InterruptedException e) {
         logger.error("JobStatusPoller interrupted", e);
       }
-
-      // set job status based on result.
-      Object jobResult = job.getReturn();
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-      } else if (job.getException() != null) {
-        logger.debug("Job ABORT, " + job.getId() + ", " + job.getErrorMessage());
-        job.setStatus(Status.ERROR);
-      } else if (jobResult != null && jobResult instanceof InterpreterResult
-          && ((InterpreterResult) jobResult).code() == Code.ERROR) {
-        logger.debug("Job Error, " + job.getId() + ", " + job.getErrorMessage());
-        job.setStatus(Status.ERROR);
-      } else {
-        logger.debug("Job Finished, " + job.getId() + ", Result: " + job.getReturn());
-        job.setStatus(Status.FINISHED);
-      }
-
-      synchronized (queue) {
-        if (listener != null) {
-          listener.jobFinished(scheduler, job);
-        }
-
-        // reset aborted flag to allow retry
-        job.aborted = false;
-
-        running.remove(job);
-        queue.notify();
-      }
     }
 
     @Override
@@ -380,13 +227,4 @@ public class RemoteScheduler implements Scheduler {
     }
   }
 
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bec799e4/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 8dd83dd..05f7bea 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -116,8 +116,6 @@ public class RemoteSchedulerTest extends AbstractInterpreterTest
     assertTrue(job.isRunning());
 
     Thread.sleep(5 * TICK_WAIT);
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(1, scheduler.getJobsRunning().size());
 
     cycles = 0;
     while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
@@ -126,8 +124,6 @@ public class RemoteSchedulerTest extends AbstractInterpreterTest
     }
 
     assertTrue(job.isTerminated());
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(0, scheduler.getJobsRunning().size());
 
     intpA.close();
     schedulerSvc.removeScheduler("test");


Mime
View raw message