zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [10/17] incubator-zeppelin git commit: Rename package/groupId to org.apache and apply rat plugin.
Date Mon, 06 Apr 2015 04:06:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
deleted file mode 100644
index 078cd3c..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
- */
-public class FIFOScheduler implements Scheduler {
-  List<Job> queue = new LinkedList<Job>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  Job runningJob = null;
-  private String name;
-
-  public FIFOScheduler(String name, ExecutorService executor, SchedulerListener listener) {
-    this.name = name;
-    this.executor = executor;
-    this.listener = listener;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<Job>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<Job>();
-    Job job = runningJob;
-
-    if (job != null) {
-      ret.add(job);
-    }
-
-    return ret;
-  }
-
-
-
-  @Override
-  public void submit(Job job) {
-    job.setStatus(Status.PENDING);
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-
-  @Override
-  public void run() {
-
-    synchronized (queue) {
-      while (terminate == false) {
-        if (runningJob != null || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-          }
-          continue;
-        }
-
-        runningJob = queue.remove(0);
-
-        final Scheduler scheduler = this;
-        this.executor.execute(new Runnable() {
-          @Override
-          public void run() {
-            if (runningJob.isAborted()) {
-              runningJob.setStatus(Status.ABORT);
-              runningJob.aborted = false;
-              synchronized (queue) {
-                queue.notify();
-              }
-              return;
-            }
-
-            runningJob.setStatus(Status.RUNNING);
-            if (listener != null) {
-              listener.jobStarted(scheduler, runningJob);
-            }
-            runningJob.run();
-            if (runningJob.isAborted()) {
-              runningJob.setStatus(Status.ABORT);
-            } else {
-              if (runningJob.getException() != null) {
-                runningJob.setStatus(Status.ERROR);
-              } else {
-                runningJob.setStatus(Status.FINISHED);
-              }
-            }
-            if (listener != null) {
-              listener.jobFinished(scheduler, runningJob);
-            }
-            // reset aborted flag to allow retry
-            runningJob.aborted = false;
-            runningJob = null;
-            synchronized (queue) {
-              queue.notify();
-            }
-          }
-        });
-      }
-    }
-  }
-
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
deleted file mode 100644
index 29f72b5..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Skeletal implementation of the Job concept.
- *  - designed for inheritance
- *  - should be run on a separate thread
- *  - maintains internal state: it's status
- *  - supports listeners who are updated on status change
- *
- *  Job class is serialized/deserialized and used server<->client communication
- *  and saving/loading jobs from disk.
- *  Changing/adding/deleting non transitive field name need consideration of that.
- *
- *  @author Leemoonsoo
- */
-public abstract class Job {
-  /**
-   * Job status.
-   *
-   * READY - Job is not running, ready to run.
-   * PENDING - Job is submitted to scheduler. but not running yet
-   * RUNNING - Job is running.
-   * FINISHED - Job finished run. with success
-   * ERROR - Job finished run. with error
-   * ABORT - Job finished by abort
-   *
-   */
-  public static enum Status {
-    READY,
-    PENDING,
-    RUNNING,
-    FINISHED,
-    ERROR,
-    ABORT;
-    boolean isReady() {
-      return this == READY;
-    }
-
-    boolean isRunning() {
-      return this == RUNNING;
-    }
-
-    boolean isPending() {
-      return this == PENDING;
-    }
-  }
-
-  private String jobName;
-  String id;
-  Object result;
-  Date dateCreated;
-  Date dateStarted;
-  Date dateFinished;
-  Status status;
-
-  transient boolean aborted = false;
-
-  String errorMessage;
-  private transient Throwable exception;
-  private transient JobListener listener;
-  private long progressUpdateIntervalMs;
-
-  public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
-    this.jobName = jobName;
-    this.listener = listener;
-    this.progressUpdateIntervalMs = progressUpdateIntervalMs;
-
-    dateCreated = new Date();
-    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HHmmss");
-    id = dateFormat.format(dateCreated) + "_" + super.hashCode();
-
-    setStatus(Status.READY);
-  }
-
-  public Job(String jobName, JobListener listener) {
-    this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC);
-  }
-
-  public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) {
-    this.jobName = jobName;
-    this.listener = listener;
-    this.progressUpdateIntervalMs = progressUpdateIntervalMs;
-
-    id = jobId;
-
-    setStatus(Status.READY);
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  @Override
-  public int hashCode() {
-    return id.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return ((Job) o).hashCode() == hashCode();
-  }
-
-  public Status getStatus() {
-    return status;
-  }
-
-  public void setStatus(Status status) {
-    if (this.status == status) {
-      return;
-    }
-    Status before = this.status;
-    Status after = status;
-    if (listener != null) {
-      listener.beforeStatusChange(this, before, after);
-    }
-    this.status = status;
-    if (listener != null) {
-      listener.afterStatusChange(this, before, after);
-    }
-  }
-
-  public void setListener(JobListener listener) {
-    this.listener = listener;
-  }
-
-  public JobListener getListener() {
-    return listener;
-  }
-
-  public boolean isTerminated() {
-    return !this.status.isReady() && !this.status.isRunning() && !this.status.isPending();
-  }
-
-  public boolean isRunning() {
-    return this.status.isRunning();
-  }
-
-  public void run() {
-    JobProgressPoller progressUpdator = null;
-    try {
-      progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs);
-      progressUpdator.start();
-      dateStarted = new Date();
-      result = jobRun();
-      this.exception = null;
-      errorMessage = null;
-      dateFinished = new Date();
-      progressUpdator.terminate();
-    } catch (NullPointerException e) {
-      logger().error("Job failed", e);
-      progressUpdator.terminate();
-      this.exception = e;
-      result = e.getMessage();
-      errorMessage = getStack(e);
-      dateFinished = new Date();
-    } catch (Throwable e) {
-      logger().error("Job failed", e);
-      progressUpdator.terminate();
-      this.exception = e;
-      result = e.getMessage();
-      errorMessage = getStack(e);
-      dateFinished = new Date();
-    } finally {
-      //aborted = false;
-    }
-  }
-
-  public String getStack(Throwable e) {
-    StackTraceElement[] stacks = e.getStackTrace();
-    if (stacks == null) {
-      return "";
-    }
-    String ss = "";
-    for (StackTraceElement s : stacks) {
-      ss += s.toString() + "\n";
-    }
-
-    return ss;
-  }
-
-  public Throwable getException() {
-    return exception;
-  }
-
-  protected void setException(Throwable t) {
-    exception = t;
-    errorMessage = getStack(t);
-  }
-
-  public Object getReturn() {
-    return result;
-  }
-
-  public String getJobName() {
-    return jobName;
-  }
-
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
-  }
-
-  public abstract int progress();
-
-  public abstract Map<String, Object> info();
-
-  protected abstract Object jobRun() throws Throwable;
-
-  protected abstract boolean jobAbort();
-
-  public void abort() {
-    aborted = jobAbort();
-  }
-
-  public boolean isAborted() {
-    return aborted;
-  }
-
-  public Date getDateCreated() {
-    return dateCreated;
-  }
-
-  public Date getDateStarted() {
-    return dateStarted;
-  }
-
-  public Date getDateFinished() {
-    return dateFinished;
-  }
-
-  private Logger logger() {
-    return LoggerFactory.getLogger(Job.class);
-  }
-
-  protected void setResult(Object result) {
-    this.result = result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java
deleted file mode 100644
index 0e573a1..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobListener.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-/**
- * TODO(moon) : add description.
- * 
- * @author Leemoonsoo
- *
- */
-public interface JobListener {
-  public void onProgressUpdate(Job job, int progress);
-
-  public void beforeStatusChange(Job job, Job.Status before, Job.Status after);
-
-  public void afterStatusChange(Job job, Job.Status before, Job.Status after);
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
deleted file mode 100644
index 142842a..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
- */
-public class JobProgressPoller extends Thread {
-  public static final long DEFAULT_INTERVAL_MSEC = 500;
-  Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
-  private Job job;
-  private long intervalMs;
-  boolean terminate = false;
-
-  public JobProgressPoller(Job job, long intervalMs) {
-    this.job = job;
-    this.intervalMs = intervalMs;
-  }
-
-  @Override
-  public void run() {
-    if (intervalMs < 0) {
-      return;
-    } else if (intervalMs == 0) {
-      intervalMs = DEFAULT_INTERVAL_MSEC;
-    }
-
-    while (terminate == false) {
-      JobListener listener = job.getListener();
-      if (listener != null) {
-        try {
-          if (job.isRunning()) {
-            listener.onProgressUpdate(job, job.progress());
-          }
-        } catch (Exception e) {
-          logger.error("Can not get or update progress", e);
-        }
-      }
-      try {
-        Thread.sleep(intervalMs);
-      } catch (InterruptedException e) {
-      }
-    }
-  }
-
-  public void terminate() {
-    terminate = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
deleted file mode 100644
index d28e125..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
- */
-public class ParallelScheduler implements Scheduler {
-  List<Job> queue = new LinkedList<Job>();
-  List<Job> running = new LinkedList<Job>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  private String name;
-  private int maxConcurrency;
-
-  public ParallelScheduler(String name, ExecutorService executor, SchedulerListener listener,
-      int maxConcurrency) {
-    this.name = name;
-    this.executor = executor;
-    this.listener = listener;
-    this.maxConcurrency = maxConcurrency;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<Job>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<Job>();
-    synchronized (queue) {
-      for (Job job : running) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-
-
-  @Override
-  public void submit(Job job) {
-    job.setStatus(Status.PENDING);
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-
-  @Override
-  public void run() {
-
-    synchronized (queue) {
-      while (terminate == false) {
-        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-          }
-          continue;
-        }
-
-        Job job = queue.remove(0);
-        running.add(job);
-        Scheduler scheduler = this;
-
-        executor.execute(new JobRunner(scheduler, job));
-      }
-
-
-    }
-  }
-
-  public void setMaxConcurrency(int maxConcurrency) {
-    this.maxConcurrency = maxConcurrency;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
-  private class JobRunner implements Runnable {
-    private Scheduler scheduler;
-    private Job job;
-
-    public JobRunner(Scheduler scheduler, Job job) {
-      this.scheduler = scheduler;
-      this.job = job;
-    }
-
-    @Override
-    public void run() {
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-        job.aborted = false;
-
-        synchronized (queue) {
-          running.remove(job);
-          queue.notify();
-        }
-
-        return;
-      }
-
-      job.setStatus(Status.RUNNING);
-      if (listener != null) {
-        listener.jobStarted(scheduler, job);
-      }
-      job.run();
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-      } else {
-        if (job.getException() != null) {
-          job.setStatus(Status.ERROR);
-        } else {
-          job.setStatus(Status.FINISHED);
-        }
-      }
-
-      if (listener != null) {
-        listener.jobFinished(scheduler, job);
-      }
-
-      // reset aborted flag to allow retry
-      job.aborted = false;
-      synchronized (queue) {
-        running.remove(job);
-        queue.notify();
-      }
-    }
-  }
-
-
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
deleted file mode 100644
index 14baa9b..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
+++ /dev/null
@@ -1,357 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import com.nflabs.zeppelin.scheduler.Job.Status;
-
-/**
- *
- */
-public class RemoteScheduler implements Scheduler {
-  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
-
-  List<Job> queue = new LinkedList<Job>();
-  List<Job> running = new LinkedList<Job>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  private String name;
-  private int maxConcurrency;
-  private RemoteInterpreterProcess interpreterProcess;
-
-  public RemoteScheduler(String name, ExecutorService executor,
-      RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
-      int maxConcurrency) {
-    this.name = name;
-    this.executor = executor;
-    this.listener = listener;
-    this.interpreterProcess = interpreterProcess;
-    this.maxConcurrency = maxConcurrency;
-  }
-
-  @Override
-  public void run() {
-    while (terminate == false) {
-      Job job = null;
-
-      synchronized (queue) {
-        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-          }
-          continue;
-        }
-
-        job = queue.remove(0);
-        running.add(job);
-      }
-
-      // run
-      Scheduler scheduler = this;
-      JobRunner jobRunner = new JobRunner(scheduler, job);
-      executor.execute(jobRunner);
-
-      // wait until it is submitted to the remote
-      while (!jobRunner.isJobSubmittedInRemote()) {
-        synchronized (queue) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<Job>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<Job>();
-    synchronized (queue) {
-      for (Job job : running) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public void submit(Job job) {
-    job.setStatus(Status.PENDING);
-
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-
-  public void setMaxConcurrency(int maxConcurrency) {
-    this.maxConcurrency = maxConcurrency;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
-  /**
-   * Role of the class is get status info from remote process from PENDING to
-   * RUNNING status.
-   */
-  private class JobStatusPoller extends Thread {
-    private long initialPeriodMsec;
-    private long initialPeriodCheckIntervalMsec;
-    private long checkIntervalMsec;
-    private boolean terminate;
-    private JobListener listener;
-    private Job job;
-    Status lastStatus;
-
-    public JobStatusPoller(long initialPeriodMsec,
-        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
-        JobListener listener) {
-      this.initialPeriodMsec = initialPeriodMsec;
-      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
-      this.checkIntervalMsec = checkIntervalMsec;
-      this.job = job;
-      this.listener = listener;
-      this.terminate = false;
-    }
-
-    @Override
-    public void run() {
-      long started = System.currentTimeMillis();
-      while (terminate == false) {
-        long current = System.currentTimeMillis();
-        long interval;
-        if (current - started < initialPeriodMsec) {
-          interval = initialPeriodCheckIntervalMsec;
-        } else {
-          interval = checkIntervalMsec;
-        }
-
-        synchronized (this) {
-          try {
-            this.wait(interval);
-          } catch (InterruptedException e) {
-          }
-        }
-
-
-        Status newStatus = getStatus();
-        if (newStatus == null) { // unknown
-          continue;
-        }
-
-        if (newStatus != Status.READY && newStatus != Status.PENDING) {
-          // we don't need more
-          continue;
-        }
-      }
-    }
-
-    public void shutdown() {
-      terminate = true;
-      synchronized (this) {
-        this.notify();
-      }
-    }
-
-
-    private Status getLastStatus() {
-      if (terminate == true) {
-        if (lastStatus != Status.FINISHED &&
-            lastStatus != Status.ERROR &&
-            lastStatus != Status.ABORT) {
-          return Status.FINISHED;
-        } else {
-          return (lastStatus == null) ? Status.FINISHED : lastStatus;
-        }
-      } else {
-        return (lastStatus == null) ? Status.FINISHED : lastStatus;
-      }
-    }
-
-    public synchronized Job.Status getStatus() {
-      if (interpreterProcess.referenceCount() <= 0) {
-        return getLastStatus();
-      }
-
-      Client client;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e) {
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      }
-
-      try {
-        String statusStr = client.getStatus(job.getId());
-        if ("Unknown".equals(statusStr)) {
-          // not found this job in the remote schedulers.
-          // maybe not submitted, maybe already finished
-          Status status = getLastStatus();
-          listener.afterStatusChange(job, null, status);
-          return status;
-        }
-        Status status = Status.valueOf(statusStr);
-        lastStatus = status;
-        listener.afterStatusChange(job, null, status);
-        return status;
-      } catch (TException e) {
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } catch (Exception e) {
-        logger.error("Unknown status", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } finally {
-        interpreterProcess.releaseClient(client);
-      }
-    }
-  }
-
-  private class JobRunner implements Runnable, JobListener {
-    private Scheduler scheduler;
-    private Job job;
-    private boolean jobExecuted;
-    boolean jobSubmittedRemotely;
-
-    public JobRunner(Scheduler scheduler, Job job) {
-      this.scheduler = scheduler;
-      this.job = job;
-      jobExecuted = false;
-      jobSubmittedRemotely = false;
-    }
-
-    public boolean isJobSubmittedInRemote() {
-      return jobSubmittedRemotely;
-    }
-
-    @Override
-    public void run() {
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-        job.aborted = false;
-
-        synchronized (queue) {
-          running.remove(job);
-          queue.notify();
-        }
-
-        return;
-      }
-
-      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
-          job, this);
-      jobStatusPoller.start();
-
-      if (listener != null) {
-        listener.jobStarted(scheduler, job);
-      }
-      job.run();
-      jobExecuted = true;
-      jobSubmittedRemotely = true;
-
-      jobStatusPoller.shutdown();
-      try {
-        jobStatusPoller.join();
-      } catch (InterruptedException e) {
-        logger.error("JobStatusPoller interrupted", e);
-      }
-
-      job.setStatus(jobStatusPoller.getStatus());
-      if (listener != null) {
-        listener.jobFinished(scheduler, job);
-      }
-
-      // reset aborted flag to allow retry
-      job.aborted = false;
-
-      synchronized (queue) {
-        running.remove(job);
-        queue.notify();
-      }
-    }
-
-    @Override
-    public void onProgressUpdate(Job job, int progress) {
-    }
-
-    @Override
-    public void beforeStatusChange(Job job, Status before, Status after) {
-    }
-
-    @Override
-    public void afterStatusChange(Job job, Status before, Status after) {
-      if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
-        if (jobExecuted) {
-          jobSubmittedRemotely = true;
-          if (job.isAborted()) {
-            job.setStatus(Status.ABORT);
-          } else if (job.getException() != null) {
-            job.setStatus(Status.ERROR);
-          } else {
-            job.setStatus(Status.FINISHED);
-          }
-        }
-        return;
-      }
-
-
-      // Update remoteStatus
-      if (jobExecuted == false) {
-        if (after == Status.FINISHED || after == Status.ABORT
-            || after == Status.ERROR) {
-          // it can be status of last run.
-          // so not updating the remoteStatus
-          return;
-        } else if (after == Status.RUNNING) {
-          jobSubmittedRemotely = true;
-        }
-      } else {
-        jobSubmittedRemotely = true;
-      }
-
-      // status polled by status poller
-      if (job.getStatus() != after) {
-        job.setStatus(after);
-      }
-    }
-  }
-
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java
deleted file mode 100644
index e772c38..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Scheduler.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.Collection;
-
-/**
- * TODO(moon) : add description.
- * 
- * @author Leemoonsoo
- *
- */
-public interface Scheduler extends Runnable {
-  public String getName();
-
-  public Collection<Job> getJobsWaiting();
-
-  public Collection<Job> getJobsRunning();
-
-  public void submit(Job job);
-
-  public void stop();
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
deleted file mode 100644
index 115e2b1..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
+++ /dev/null
@@ -1,129 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-
-/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
- */
-public class SchedulerFactory implements SchedulerListener {
-  private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
-  ScheduledExecutorService executor;
-  Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>();
-
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
-
-  public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-      }
-    }
-    return singleton;
-  }
-
-  public SchedulerFactory() throws Exception {
-    executor = Executors.newScheduledThreadPool(100);
-  }
-
-  public void destroy() {
-    executor.shutdown();
-  }
-
-  public Scheduler createOrGetFIFOScheduler(String name) {
-    synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
-        Scheduler s = new FIFOScheduler(name, executor, this);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
-    }
-  }
-
-  public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
-    synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
-        Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
-    }
-  }
-
-  public Scheduler createOrGetRemoteScheduler(
-      String name,
-      RemoteInterpreterProcess interpreterProcess,
-      int maxConcurrency) {
-
-    synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
-        Scheduler s = new RemoteScheduler(
-            name,
-            executor,
-            interpreterProcess,
-            this,
-            maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
-      }
-      return schedulers.get(name);
-    }
-  }
-
-  public Scheduler removeScheduler(String name) {
-    synchronized (schedulers) {
-      Scheduler s = schedulers.remove(name);
-      if (s != null) {
-        s.stop();
-      }
-    }
-    return null;
-  }
-
-  public Collection<Scheduler> listScheduler(String name) {
-    List<Scheduler> s = new LinkedList<Scheduler>();
-    synchronized (schedulers) {
-      for (Scheduler ss : schedulers.values()) {
-        s.add(ss);
-      }
-    }
-    return s;
-  }
-
-  @Override
-  public void jobStarted(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
-
-  }
-
-  @Override
-  public void jobFinished(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
-
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java
deleted file mode 100644
index d551679..0000000
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerListener.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.nflabs.zeppelin.scheduler;
-
-/**
- * TODO(moon) : add description.
- * 
- * @author Leemoonsoo
- *
- */
-public interface SchedulerListener {
-  public void jobStarted(Scheduler scheduler, Job job);
-
-  public void jobFinished(Scheduler scheduler, Job job);
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java
new file mode 100644
index 0000000..8ae7630
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/GUI.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.zeppelin.display.Input.ParamOption;
+
+/**
+ * Settings of a form.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class GUI implements Serializable {
+
+  Map<String, Object> params = new HashMap<String, Object>(); // form parameters from client
+  Map<String, Input> forms = new TreeMap<String, Input>(); // form configuration
+
+  public GUI() {
+
+  }
+
+  public void setParams(Map<String, Object> values) {
+    this.params = values;
+  }
+
+  public Map<String, Object> getParams() {
+    return params;
+  }
+
+  public Map<String, Input> getForms() {
+    return forms;
+  }
+
+  public void setForms(Map<String, Input> forms) {
+    this.forms = forms;
+  }
+
+  public Object input(String id, Object defaultValue) {
+    // first find values from client and then use default
+    Object value = params.get(id);
+    if (value == null) {
+      value = defaultValue;
+    }
+
+    forms.put(id, new Input(id, defaultValue));
+    return value;
+  }
+
+  public Object input(String id) {
+    return input(id, "");
+  }
+
+  public Object select(String id, Object defaultValue, ParamOption[] options) {
+    Object value = params.get(id);
+    if (value == null) {
+      value = defaultValue;
+    }
+    forms.put(id, new Input(id, defaultValue, options));
+    return value;
+  }
+
+  public void clear() {
+    this.forms = new TreeMap<String, Input>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java
new file mode 100644
index 0000000..2f7858c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/Input.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Input type.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class Input implements Serializable {
+  /**
+   * Parameters option.
+   *
+   * @author Leemoonsoo
+   *
+   */
+  public static class ParamOption {
+    Object value;
+    String displayName;
+
+    public ParamOption(Object value, String displayName) {
+      super();
+      this.value = value;
+      this.displayName = displayName;
+    }
+
+    public Object getValue() {
+      return value;
+    }
+
+    public void setValue(Object value) {
+      this.value = value;
+    }
+
+    public String getDisplayName() {
+      return displayName;
+    }
+
+    public void setDisplayName(String displayName) {
+      this.displayName = displayName;
+    }
+
+  }
+
+  String name;
+  String displayName;
+  String type;
+  Object defaultValue;
+  ParamOption[] options;
+  boolean hidden;
+
+  public Input(String name, Object defaultValue) {
+    this.name = name;
+    this.displayName = name;
+    this.defaultValue = defaultValue;
+  }
+
+  public Input(String name, Object defaultValue, ParamOption[] options) {
+    this.name = name;
+    this.displayName = name;
+    this.defaultValue = defaultValue;
+    this.options = options;
+  }
+
+
+  public Input(String name, String displayName, String type, Object defaultValue,
+      ParamOption[] options, boolean hidden) {
+    super();
+    this.name = name;
+    this.displayName = displayName;
+    this.type = type;
+    this.defaultValue = defaultValue;
+    this.options = options;
+    this.hidden = hidden;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return name.equals(((Input) o).getName());
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getDisplayName() {
+    return displayName;
+  }
+
+  public void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public Object getDefaultValue() {
+    return defaultValue;
+  }
+
+  public void setDefaultValue(Object defaultValue) {
+    this.defaultValue = defaultValue;
+  }
+
+  public ParamOption[] getOptions() {
+    return options;
+  }
+
+  public void setOptions(ParamOption[] options) {
+    this.options = options;
+  }
+
+  public boolean isHidden() {
+    return hidden;
+  }
+
+
+  private static String[] getNameAndDisplayName(String str) {
+    Pattern p = Pattern.compile("([^(]*)\\s*[(]([^)]*)[)]");
+    Matcher m = p.matcher(str.trim());
+    if (m == null || m.find() == false) {
+      return null;
+    }
+    String[] ret = new String[2];
+    ret[0] = m.group(1);
+    ret[1] = m.group(2);
+    return ret;
+  }
+
+  private static String[] getType(String str) {
+    Pattern p = Pattern.compile("([^:]*)\\s*:\\s*(.*)");
+    Matcher m = p.matcher(str.trim());
+    if (m == null || m.find() == false) {
+      return null;
+    }
+    String[] ret = new String[2];
+    ret[0] = m.group(1).trim();
+    ret[1] = m.group(2).trim();
+    return ret;
+  }
+
+  public static Map<String, Input> extractSimpleQueryParam(String script) {
+    Map<String, Input> params = new HashMap<String, Input>();
+    if (script == null) {
+      return params;
+    }
+    String replaced = script;
+
+    Pattern pattern = Pattern.compile("([_])?[$][{]([^=}]*([=][^}]*)?)[}]");
+
+    Matcher match = pattern.matcher(replaced);
+    while (match.find()) {
+      String hiddenPart = match.group(1);
+      boolean hidden = false;
+      if ("_".equals(hiddenPart)) {
+        hidden = true;
+      }
+      String m = match.group(2);
+
+      String namePart;
+      String valuePart;
+
+      int p = m.indexOf('=');
+      if (p > 0) {
+        namePart = m.substring(0, p);
+        valuePart = m.substring(p + 1);
+      } else {
+        namePart = m;
+        valuePart = null;
+      }
+
+
+      String varName;
+      String displayName = null;
+      String type = null;
+      String defaultValue = "";
+      ParamOption[] paramOptions = null;
+
+      // get var name type
+      String varNamePart;
+      String[] typeArray = getType(namePart);
+      if (typeArray != null) {
+        type = typeArray[0];
+        varNamePart = typeArray[1];
+      } else {
+        varNamePart = namePart;
+      }
+
+      // get var name and displayname
+      String[] varNameArray = getNameAndDisplayName(varNamePart);
+      if (varNameArray != null) {
+        varName = varNameArray[0];
+        displayName = varNameArray[1];
+      } else {
+        varName = varNamePart.trim();
+      }
+
+      // get defaultValue
+      if (valuePart != null) {
+        // find default value
+        int optionP = valuePart.indexOf(",");
+        if (optionP > 0) { // option available
+          defaultValue = valuePart.substring(0, optionP);
+          String optionPart = valuePart.substring(optionP + 1);
+          String[] options = Input.splitPipe(optionPart);
+
+          paramOptions = new ParamOption[options.length];
+
+          for (int i = 0; i < options.length; i++) {
+
+            String[] optNameArray = getNameAndDisplayName(options[i]);
+            if (optNameArray != null) {
+              paramOptions[i] = new ParamOption(optNameArray[0], optNameArray[1]);
+            } else {
+              paramOptions[i] = new ParamOption(options[i], null);
+            }
+          }
+
+
+        } else { // no option
+          defaultValue = valuePart;
+        }
+
+      }
+
+      Input param = new Input(varName, displayName, type, defaultValue, paramOptions, hidden);
+      params.put(varName, param);
+    }
+
+    params.remove("pql");
+    return params;
+  }
+
+  public static String getSimpleQuery(Map<String, Object> params, String script) {
+    String replaced = script;
+
+    for (String key : params.keySet()) {
+      Object value = params.get(key);
+      replaced =
+          replaced.replaceAll("[_]?[$][{]([^:]*[:])?" + key + "([(][^)]*[)])?(=[^}]*)?[}]",
+                              value.toString());
+    }
+
+    Pattern pattern = Pattern.compile("[$][{]([^=}]*[=][^}]*)[}]");
+    while (true) {
+      Matcher match = pattern.matcher(replaced);
+      if (match != null && match.find()) {
+        String m = match.group(1);
+        int p = m.indexOf('=');
+        String replacement = m.substring(p + 1);
+        int optionP = replacement.indexOf(",");
+        if (optionP > 0) {
+          replacement = replacement.substring(0, optionP);
+        }
+        replaced =
+            replaced.replaceFirst("[_]?[$][{]"
+                + m.replaceAll("[(]", ".").replaceAll("[)]", ".").replaceAll("[|]", ".") + "[}]",
+                replacement);
+      } else {
+        break;
+      }
+    }
+
+    replaced = replaced.replace("[_]?[$][{]([^=}]*)[}]", "");
+    return replaced;
+  }
+
+
+  public static String[] split(String str) {
+    return str.split(";(?=([^\"']*\"[^\"']*\")*[^\"']*$)");
+
+  }
+
+  /*
+   * public static String [] splitPipe(String str){ //return
+   * str.split("\\|(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); return
+   * str.split("\\|(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); }
+   */
+
+
+  public static String[] splitPipe(String str) {
+    return split(str, '|');
+  }
+
+  public static String[] split(String str, char split) {
+    return split(str, new String[] {String.valueOf(split)}, false);
+  }
+
+  public static String[] split(String str, String[] splitters, boolean includeSplitter) {
+    String escapeSeq = "\"',;${}";
+    char escapeChar = '\\';
+
+    String[] blockStart = new String[] {"\"", "'", "${", "N_(", "N_<"};
+    String[] blockEnd = new String[] {"\"", "'", "}", "N_)", "N_>"};
+
+    return split(str, escapeSeq, escapeChar, blockStart, blockEnd, splitters, includeSplitter);
+
+  }
+
+  public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart,
+      String[] blockEnd, String[] splitters, boolean includeSplitter) {
+
+    List<String> splits = new ArrayList<String>();
+
+    String curString = "";
+
+    boolean escape = false; // true when escape char is found
+    int lastEscapeOffset = -1;
+    int blockStartPos = -1;
+    List<Integer> blockStack = new LinkedList<Integer>();
+
+    for (int i = 0; i < str.length(); i++) {
+      char c = str.charAt(i);
+
+      // escape char detected
+      if (c == escapeChar && escape == false) {
+        escape = true;
+        continue;
+      }
+
+      // escaped char comes
+      if (escape == true) {
+        if (escapeSeq.indexOf(c) < 0) {
+          curString += escapeChar;
+        }
+        curString += c;
+        escape = false;
+        lastEscapeOffset = curString.length();
+        continue;
+      }
+
+      if (blockStack.size() > 0) { // inside of block
+        curString += c;
+        // check multichar block
+        boolean multicharBlockDetected = false;
+        for (int b = 0; b < blockStart.length; b++) {
+          if (blockStartPos >= 0
+              && getBlockStr(blockStart[b]).compareTo(str.substring(blockStartPos, i)) == 0) {
+            blockStack.remove(0);
+            blockStack.add(0, b);
+            multicharBlockDetected = true;
+            break;
+          }
+        }
+
+        if (multicharBlockDetected == true) {
+          continue;
+        }
+
+        // check if current block is nestable
+        if (isNestedBlock(blockStart[blockStack.get(0)]) == true) {
+          // try to find nested block start
+
+          if (curString.substring(lastEscapeOffset + 1).endsWith(
+              getBlockStr(blockStart[blockStack.get(0)])) == true) {
+            blockStack.add(0, blockStack.get(0)); // block is started
+            blockStartPos = i;
+            continue;
+          }
+        }
+
+        // check if block is finishing
+        if (curString.substring(lastEscapeOffset + 1).endsWith(
+            getBlockStr(blockEnd[blockStack.get(0)]))) {
+          // the block closer is one of the splitters (and not nested block)
+          if (isNestedBlock(blockEnd[blockStack.get(0)]) == false) {
+            for (String splitter : splitters) {
+              if (splitter.compareTo(getBlockStr(blockEnd[blockStack.get(0)])) == 0) {
+                splits.add(curString);
+                if (includeSplitter == true) {
+                  splits.add(splitter);
+                }
+                curString = "";
+                lastEscapeOffset = -1;
+
+                break;
+              }
+            }
+          }
+          blockStartPos = -1;
+          blockStack.remove(0);
+          continue;
+        }
+
+      } else { // not in the block
+        boolean splitted = false;
+        for (String splitter : splitters) {
+          // forward check for splitter
+          int curentLenght = i + splitter.length();
+          if (splitter.compareTo(str.substring(i, Math.min(curentLenght, str.length()))) == 0) {
+            splits.add(curString);
+            if (includeSplitter == true) {
+              splits.add(splitter);
+            }
+            curString = "";
+            lastEscapeOffset = -1;
+            i += splitter.length() - 1;
+            splitted = true;
+            break;
+          }
+        }
+        if (splitted == true) {
+          continue;
+        }
+
+        // add char to current string
+        curString += c;
+
+        // check if block is started
+        for (int b = 0; b < blockStart.length; b++) {
+          if (curString.substring(lastEscapeOffset + 1)
+                       .endsWith(getBlockStr(blockStart[b])) == true) {
+            blockStack.add(0, b); // block is started
+            blockStartPos = i;
+            break;
+          }
+        }
+      }
+    }
+    if (curString.length() > 0) {
+      splits.add(curString.trim());
+    }
+    return splits.toArray(new String[] {});
+
+  }
+
+  private static String getBlockStr(String blockDef) {
+    if (blockDef.startsWith("N_")) {
+      return blockDef.substring("N_".length());
+    } else {
+      return blockDef;
+    }
+  }
+
+  private static boolean isNestedBlock(String blockDef) {
+    if (blockDef.startsWith("N_")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
new file mode 100644
index 0000000..d3d6c1c
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.scheduler.Scheduler;
+
+/**
+ * Add to the classpath interpreters.
+ *
+ */
+public class ClassloaderInterpreter
+    extends Interpreter
+    implements WrappedInterpreter {
+
+  private ClassLoader cl;
+  private Interpreter intp;
+
+  public ClassloaderInterpreter(Interpreter intp, ClassLoader cl) {
+    super(new Properties());
+    this.cl = cl;
+    this.intp = intp;
+  }
+
+  @Override
+  public Interpreter getInnerInterpreter() {
+    return intp;
+  }
+
+  public ClassLoader getClassloader() {
+    return cl;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.interpret(st, context);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+
+  @Override
+  public void open() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      intp.open();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public void close() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      intp.close();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      intp.cancel(context);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public FormType getFormType() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getFormType();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getProgress(context);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getScheduler();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.completion(buf, cursor);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+
+  @Override
+  public String getClassName() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getClassName();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      intp.setInterpreterGroup(interpreterGroup);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public InterpreterGroup getInterpreterGroup() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getInterpreterGroup();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public void setClassloaderUrls(URL [] urls) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      intp.setClassloaderUrls(urls);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public URL [] getClassloaderUrls() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getClassloaderUrls();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public void setProperty(Properties property) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      intp.setProperty(property);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public Properties getProperty() {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getProperty();
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+  @Override
+  public String getProperty(String key) {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    Thread.currentThread().setContextClassLoader(cl);
+    try {
+      return intp.getProperty(key);
+    } catch (Exception e) {
+      throw new InterpreterException(e);
+    } finally {
+      cl = Thread.currentThread().getContextClassLoader();
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
new file mode 100644
index 0000000..58dcb64
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface for interpreters.
+ * If you want to implement new Zeppelin interpreter, extend this class
+ *
+ * Please see,
+ * http://zeppelin.incubator.apache.org/docs/development/writingzeppelininterpreter.html
+ *
+ * open(), close(), interpreter() is three the most important method you need to implement.
+ * cancel(), getProgress(), completion() is good to have
+ * getFormType(), getScheduler() determine Zeppelin's behavior
+ *
+ */
+public abstract class Interpreter {
+
+  /**
+   * Opens interpreter. You may want to place your initialize routine here.
+   * open() is called only once
+   */
+  public abstract void open();
+
+  /**
+   * Closes interpreter. You may want to free your resources up here.
+   * close() is called only once
+   */
+  public abstract void close();
+
+  /**
+   * Run code and return result, in synchronous way.
+   *
+   * @param st statements to run
+   * @param context
+   * @return
+   */
+  public abstract InterpreterResult interpret(String st, InterpreterContext context);
+
+  /**
+   * Optionally implement the canceling routine to abort interpret() method
+   *
+   * @param context
+   */
+  public abstract void cancel(InterpreterContext context);
+
+  /**
+   * Dynamic form handling
+   * see http://zeppelin.incubator.apache.org/docs/dynamicform.html
+   *
+   * @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}),
+   *         FormType.NATIVE handles form in API
+   */
+  public abstract FormType getFormType();
+
+  /**
+   * get interpret() method running process in percentage.
+   *
+   * @param context
+   * @return number between 0-100
+   */
+  public abstract int getProgress(InterpreterContext context);
+
+  /**
+   * Get completion list based on cursor position.
+   * By implementing this method, it enables auto-completion.
+   *
+   * @param buf statements
+   * @param cursor cursor position in statements
+   * @return list of possible completion. Return empty list if there're nothing to return.
+   */
+  public abstract List<String> completion(String buf, int cursor);
+
+  /**
+   * Interpreter can implements it's own scheduler by overriding this method.
+   * There're two default scheduler provided, FIFO, Parallel.
+   * If your interpret() can handle concurrent request, use Parallel or use FIFO.
+   *
+   * You can get default scheduler by using
+   * SchedulerFactory.singleton().createOrGetFIFOScheduler()
+   * SchedulerFactory.singleton().createOrGetParallelScheduler()
+   *
+   *
+   * @return return scheduler instance.
+   *         This method can be called multiple times and have to return the same instance.
+   *         Can not return null.
+   */
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode());
+  }
+
+  /**
+   * Called when interpreter is no longer used.
+   */
+  public void destroy() {
+    getScheduler().stop();
+  }
+
+
+
+
+
+  static Logger logger = LoggerFactory.getLogger(Interpreter.class);
+  private InterpreterGroup interpreterGroup;
+  private URL [] classloaderUrls;
+  protected Properties property;
+
+  public Interpreter(Properties property) {
+    this.property = property;
+  }
+
+  public void setProperty(Properties property) {
+    this.property = property;
+  }
+
+  public Properties getProperty() {
+    Properties p = new Properties();
+    p.putAll(property);
+
+    Map<String, InterpreterProperty> defaultProperties = Interpreter
+        .findRegisteredInterpreterByClassName(getClassName()).getProperties();
+    for (String k : defaultProperties.keySet()) {
+      if (!p.contains(k)) {
+        String value = defaultProperties.get(k).getDefaultValue();
+        if (value != null) {
+          p.put(k, defaultProperties.get(k).getDefaultValue());
+        }
+      }
+    }
+
+    return property;
+  }
+
+  public String getProperty(String key) {
+    if (property.containsKey(key)) {
+      return property.getProperty(key);
+    }
+
+    Map<String, InterpreterProperty> defaultProperties = Interpreter
+        .findRegisteredInterpreterByClassName(getClassName()).getProperties();
+    if (defaultProperties.containsKey(key)) {
+      return defaultProperties.get(key).getDefaultValue();
+    }
+
+    return null;
+  }
+
+
+  public String getClassName() {
+    return this.getClass().getName();
+  }
+
+  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  public InterpreterGroup getInterpreterGroup() {
+    return this.interpreterGroup;
+  }
+
+  public URL[] getClassloaderUrls() {
+    return classloaderUrls;
+  }
+
+  public void setClassloaderUrls(URL[] classloaderUrls) {
+    this.classloaderUrls = classloaderUrls;
+  }
+
+
+  /**
+   * Type of interpreter.
+   */
+  public static enum FormType {
+    NATIVE, SIMPLE, NONE
+  }
+
+  /**
+   * Represent registered interpreter class
+   */
+  public static class RegisteredInterpreter {
+    private String name;
+    private String group;
+    private String className;
+    private Map<String, InterpreterProperty> properties;
+    private String path;
+
+    public RegisteredInterpreter(String name, String group, String className,
+        Map<String, InterpreterProperty> properties) {
+      super();
+      this.name = name;
+      this.group = group;
+      this.className = className;
+      this.properties = properties;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getGroup() {
+      return group;
+    }
+
+    public String getClassName() {
+      return className;
+    }
+
+    public Map<String, InterpreterProperty> getProperties() {
+      return properties;
+    }
+
+    public void setPath(String path) {
+      this.path = path;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+  }
+
+  /**
+   * Type of Scheduling.
+   */
+  public static enum SchedulingMode {
+    FIFO, PARALLEL
+  }
+
+  public static Map<String, RegisteredInterpreter> registeredInterpreters = Collections
+      .synchronizedMap(new HashMap<String, RegisteredInterpreter>());
+
+  public static void register(String name, String className) {
+    register(name, name, className);
+  }
+
+  public static void register(String name, String group, String className) {
+    register(name, group, className, new HashMap<String, InterpreterProperty>());
+  }
+
+  public static void register(String name, String group, String className,
+      Map<String, InterpreterProperty> properties) {
+    registeredInterpreters.put(name, new RegisteredInterpreter(name, group, className, properties));
+  }
+
+  public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) {
+    for (RegisteredInterpreter ri : registeredInterpreters.values()) {
+      if (ri.getClassName().equals(className)) {
+        return ri;
+      }
+    }
+    return null;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
new file mode 100644
index 0000000..2d70c8e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.Map;
+
+import org.apache.zeppelin.display.GUI;
+
+/**
+ * Interpreter context
+ */
+public class InterpreterContext {
+  private final String paragraphTitle;
+  private final String paragraphId;
+  private final String paragraphText;
+  private final Map<String, Object> config;
+  private GUI gui;
+
+
+  public InterpreterContext(String paragraphId,
+                            String paragraphTitle,
+                            String paragraphText,
+                            Map<String, Object> config,
+                            GUI gui
+                            ) {
+    this.paragraphId = paragraphId;
+    this.paragraphTitle = paragraphTitle;
+    this.paragraphText = paragraphText;
+    this.config = config;
+    this.gui = gui;
+  }
+
+  public String getParagraphId() {
+    return paragraphId;
+  }
+
+  public String getParagraphText() {
+    return paragraphText;
+  }
+
+  public String getParagraphTitle() {
+    return paragraphTitle;
+  }
+
+  public Map<String, Object> getConfig() {
+    return config;
+  }
+
+  public GUI getGui() {
+    return gui;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java
new file mode 100644
index 0000000..30c1c0a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Runtime Exception for interpreters.
+ *
+ */
+public class InterpreterException extends RuntimeException {
+
+  public InterpreterException(Throwable e) {
+    super(e);
+  }
+
+  public InterpreterException(String m) {
+    super(m);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
new file mode 100644
index 0000000..834630a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * InterpreterGroup is list of interpreters in the same group.
+ * And unit of interpreter instantiate, restart, bind, unbind.
+ */
+public class InterpreterGroup extends LinkedList<Interpreter>{
+  String id;
+
+  private static String generateId() {
+    return "InterpreterGroup_" + System.currentTimeMillis() + "_"
+           + new Random().nextInt();
+  }
+
+  public String getId() {
+    synchronized (this) {
+      if (id == null) {
+        id = generateId();
+      }
+      return id;
+    }
+  }
+
+
+  public Properties getProperty() {
+    Properties p = new Properties();
+    for (Interpreter intp : this) {
+      p.putAll(intp.getProperty());
+    }
+    return p;
+  }
+
+  public void close() {
+    for (Interpreter intp : this) {
+      intp.close();
+    }
+  }
+
+  public void destroy() {
+    for (Interpreter intp : this) {
+      intp.destroy();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
new file mode 100644
index 0000000..cc13ace
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Represent property of interpreter
+ */
+public class InterpreterProperty {
+  String defaultValue;
+  String description;
+
+  public InterpreterProperty(String defaultValue,
+      String description) {
+    super();
+    this.defaultValue = defaultValue;
+    this.description = description;
+  }
+
+  public String getDefaultValue() {
+    return defaultValue;
+  }
+
+  public void setDefaultValue(String defaultValue) {
+    this.defaultValue = defaultValue;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public void setDescription(String description) {
+    this.description = description;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java
new file mode 100644
index 0000000..f077b4e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterPropertyBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InterpreterPropertyBuilder
+ */
+public class InterpreterPropertyBuilder {
+  Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>();
+
+  public InterpreterPropertyBuilder add(String name, String defaultValue, String description){
+    properties.put(name, new InterpreterProperty(defaultValue, description));
+    return this;
+  }
+
+  public Map<String, InterpreterProperty> build(){
+    return properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
new file mode 100644
index 0000000..0659a47
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.io.Serializable;
+
+/**
+ * Interpreter result template.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class InterpreterResult implements Serializable {
+
+  /**
+   *  Type of result after code execution.
+   *
+   * @author Leemoonsoo
+   *
+   */
+  public static enum Code {
+    SUCCESS,
+    INCOMPLETE,
+    ERROR
+  }
+
+  /**
+   * Type of Data.
+   *
+   * @author Leemoonsoo
+   *
+   */
+  public static enum Type {
+    TEXT,
+    HTML,
+    TABLE,
+    IMG,
+    SVG,
+    NULL
+  }
+
+  Code code;
+  Type type;
+  String msg;
+
+  public InterpreterResult(Code code) {
+    this.code = code;
+    this.msg = null;
+    this.type = Type.TEXT;
+  }
+
+  public InterpreterResult(Code code, String msg) {
+    this.code = code;
+    this.msg = getData(msg);
+    this.type = getType(msg);
+  }
+
+  public InterpreterResult(Code code, Type type, String msg) {
+    this.code = code;
+    this.msg = msg;
+    this.type = type;
+  }
+
+  /**
+   * Magic is like %html %text.
+   *
+   * @param msg
+   * @return
+   */
+  private String getData(String msg) {
+    if (msg == null) {
+      return null;
+    }
+
+    Type[] types = Type.values();
+    for (Type t : types) {
+      String magic = "%" + t.name().toLowerCase();
+      if (msg.startsWith(magic + " ") || msg.startsWith(magic + "\n")) {
+        int magicLength = magic.length() + 1;
+        if (msg.length() > magicLength) {
+          return msg.substring(magicLength);
+        } else {
+          return "";
+        }
+      }
+    }
+
+    return msg;
+  }
+
+
+  private Type getType(String msg) {
+    if (msg == null) {
+      return Type.TEXT;
+    }
+    Type[] types = Type.values();
+    for (Type t : types) {
+      String magic = "%" + t.name().toLowerCase();
+      if (msg.startsWith(magic + " ") || msg.startsWith(magic + "\n")) {
+        return t;
+      }
+    }
+    return Type.TEXT;
+  }
+
+  public Code code() {
+    return code;
+  }
+
+  public String message() {
+    return msg;
+  }
+
+  public Type type() {
+    return type;
+  }
+
+  public InterpreterResult type(Type type) {
+    this.type = type;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java
new file mode 100644
index 0000000..c3d3b9e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Interpreter utility functions
+ */
+public class InterpreterUtils {
+
+  public static String getMostRelevantMessage(Exception ex) {
+    if (ex instanceof InvocationTargetException) {
+      Throwable cause = ((InvocationTargetException) ex).getCause();
+      if (cause != null) {
+        return cause.getMessage();
+      }
+    }
+    return ex.getMessage();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
new file mode 100644
index 0000000..599a24a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.scheduler.Scheduler;
+
+/**
+ * Interpreter wrapper for lazy initialization
+ */
+public class LazyOpenInterpreter
+    extends Interpreter
+    implements WrappedInterpreter {
+  private Interpreter intp;
+  boolean opened = false;
+
+  public LazyOpenInterpreter(Interpreter intp) {
+    super(new Properties());
+    this.intp = intp;
+  }
+
+  @Override
+  public Interpreter getInnerInterpreter() {
+    return intp;
+  }
+
+  @Override
+  public void setProperty(Properties property) {
+    intp.setProperty(property);
+  }
+
+  @Override
+  public Properties getProperty() {
+    return intp.getProperty();
+  }
+
+  @Override
+  public String getProperty(String key) {
+    return intp.getProperty(key);
+  }
+
+  @Override
+  public void open() {
+    if (opened == true) {
+      return;
+    }
+
+    synchronized (intp) {
+      if (opened == false) {
+        intp.open();
+        opened = true;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    synchronized (intp) {
+      if (opened == true) {
+        intp.close();
+        opened = false;
+      }
+    }
+  }
+
+  public boolean isOpen() {
+    synchronized (intp) {
+      return opened;
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    open();
+    return intp.interpret(st, context);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    open();
+    intp.cancel(context);
+  }
+
+  @Override
+  public FormType getFormType() {
+    return intp.getFormType();
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    open();
+    return intp.getProgress(context);
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return intp.getScheduler();
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    open();
+    return intp.completion(buf, cursor);
+  }
+
+  @Override
+  public String getClassName() {
+    return intp.getClassName();
+  }
+
+  @Override
+  public InterpreterGroup getInterpreterGroup() {
+    return intp.getInterpreterGroup();
+  }
+
+  @Override
+  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+    intp.setInterpreterGroup(interpreterGroup);
+  }
+
+  @Override
+  public URL [] getClassloaderUrls() {
+    return intp.getClassloaderUrls();
+  }
+
+  @Override
+  public void setClassloaderUrls(URL [] urls) {
+    intp.setClassloaderUrls(urls);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java
new file mode 100644
index 0000000..a12a9aa
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/WrappedInterpreter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ * WrappedInterpreter
+ */
+public interface WrappedInterpreter {
+  public Interpreter getInnerInterpreter();
+}


Mime
View raw message