zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [02/50] incubator-zeppelin git commit: Implement RemoteScheduler
Date Fri, 20 Mar 2015 18:01:24 GMT
Implement RemoteScheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/7d3f55de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/7d3f55de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/7d3f55de

Branch: refs/heads/master
Commit: 7d3f55defd3acce5c3d98a3366766d78e908aaec
Parents: 3490f2e
Author: Lee moon soo <leemoonsoo@gmail.com>
Authored: Wed Mar 11 15:08:29 2015 +0900
Committer: Lee moon soo <leemoonsoo@gmail.com>
Committed: Wed Mar 11 15:08:29 2015 +0900

----------------------------------------------------------------------
 .../interpreter/LazyOpenInterpreter.java        |   7 +-
 .../interpreter/remote/RemoteInterpreter.java   |   6 +-
 .../remote/RemoteInterpreterServer.java         |  28 +-
 .../thrift/RemoteInterpreterService.java        | 788 +++++++++++++++++++
 .../zeppelin/scheduler/FIFOScheduler.java       |  30 +-
 .../java/com/nflabs/zeppelin/scheduler/Job.java |  42 +-
 .../zeppelin/scheduler/ParallelScheduler.java   |  35 +-
 .../zeppelin/scheduler/RemoteScheduler.java     | 286 +++++++
 .../zeppelin/scheduler/SchedulerFactory.java    |  26 +-
 .../main/thrift/RemoteInterpreterService.thrift |   3 +-
 .../zeppelin/scheduler/FIFOSchedulerTest.java   |   2 +-
 .../zeppelin/scheduler/RemoteSchedulerTest.java |  90 +++
 .../nflabs/zeppelin/socket/NotebookServer.java  |   2 +-
 13 files changed, 1311 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
index 0d09bc1..0703320 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -12,7 +12,6 @@ import com.nflabs.zeppelin.scheduler.Scheduler;
 public class LazyOpenInterpreter
     extends Interpreter
     implements WrappedInterpreter {
-
   private Interpreter intp;
   boolean opened = false;
 
@@ -47,7 +46,7 @@ public class LazyOpenInterpreter
       return;
     }
 
-    synchronized (this) {
+    synchronized (intp) {
       if (opened == false) {
         intp.open();
         opened = true;
@@ -57,7 +56,7 @@ public class LazyOpenInterpreter
 
   @Override
   public void close() {
-    synchronized (this) {
+    synchronized (intp) {
       if (opened == true) {
         intp.close();
         opened = false;
@@ -66,7 +65,7 @@ public class LazyOpenInterpreter
   }
 
   public boolean isOpen() {
-    synchronized (this) {
+    synchronized (intp) {
       return opened;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
index 7d7b801..7d98a48 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -291,8 +291,10 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetParallelScheduler(
-        "remoteinterpreter_" + this.hashCode(), 10);
+    int maxConcurrency = 10;
+
+    return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+        "remoteinterpreter_" + this.hashCode(), getInterpreterProcess(), maxConcurrency);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index d81a657..8ab13a4 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -31,6 +31,7 @@ import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import com.nflabs.zeppelin.scheduler.Job;
 import com.nflabs.zeppelin.scheduler.Job.Status;
 import com.nflabs.zeppelin.scheduler.JobListener;
+import com.nflabs.zeppelin.scheduler.JobProgressPoller;
 import com.nflabs.zeppelin.scheduler.Scheduler;
 
 
@@ -158,8 +159,10 @@ public class RemoteInterpreterServer
     Scheduler scheduler = intp.getScheduler();
     InterpretJobListener jobListener = new InterpretJobListener();
     InterpretJob job = new InterpretJob(
+        interpreterContext.getParagraphId(),
         "remoteInterpretJob_" + System.currentTimeMillis(),
         jobListener,
+        JobProgressPoller.DEFAULT_INTERVAL_MSEC,
         intp,
         st,
         context);
@@ -214,12 +217,14 @@ public class RemoteInterpreterServer
     private InterpreterContext context;
 
     public InterpretJob(
+        String jobId,
         String jobName,
         JobListener listener,
+        long progressUpdateIntervalMsec,
         Interpreter interpreter,
         String script,
         InterpreterContext context) {
-      super(jobName, listener);
+      super(jobId, jobName, listener, progressUpdateIntervalMsec);
       this.interpreter = interpreter;
       this.script = script;
       this.context = context;
@@ -294,4 +299,25 @@ public class RemoteInterpreterServer
         gson.toJson(config),
         gson.toJson(gui));
   }
+
+  @Override
+  public String getStatus(String jobId)
+      throws TException {
+    synchronized (interpreterGroup) {
+      for (Interpreter intp : interpreterGroup) {
+        for (Job job : intp.getScheduler().getJobsRunning()) {
+          if (jobId.equals(job.getId())) {
+            return job.getStatus().name();
+          }
+        }
+
+        for (Job job : intp.getScheduler().getJobsWaiting()) {
+          if (jobId.equals(job.getId())) {
+            return job.getStatus().name();
+          }
+        }
+      }
+    }
+    return "Unknown";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index b07af47..eed35c4 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -52,6 +52,8 @@ public class RemoteInterpreterService {
 
     public void shutdown() throws org.apache.thrift.TException;
 
+    public String getStatus(String jobId) throws org.apache.thrift.TException;
+
   }
 
   public interface AsyncIface {
@@ -74,6 +76,8 @@ public class RemoteInterpreterService {
 
     public void shutdown(org.apache.thrift.async.AsyncMethodCallback<AsyncClient.shutdown_call> resultHandler) throws org.apache.thrift.TException;
 
+    public void getStatus(String jobId, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getStatus_call> resultHandler) throws org.apache.thrift.TException;
+
   }
 
   public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@@ -294,6 +298,29 @@ public class RemoteInterpreterService {
       return;
     }
 
+    public String getStatus(String jobId) throws org.apache.thrift.TException
+    {
+      send_getStatus(jobId);
+      return recv_getStatus();
+    }
+
+    public void send_getStatus(String jobId) throws org.apache.thrift.TException
+    {
+      getStatus_args args = new getStatus_args();
+      args.setJobId(jobId);
+      sendBase("getStatus", args);
+    }
+
+    public String recv_getStatus() throws org.apache.thrift.TException
+    {
+      getStatus_result result = new getStatus_result();
+      receiveBase(result, "getStatus");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getStatus failed: unknown result");
+    }
+
   }
   public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
     public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -618,6 +645,38 @@ public class RemoteInterpreterService {
       }
     }
 
+    public void getStatus(String jobId, org.apache.thrift.async.AsyncMethodCallback<getStatus_call> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      getStatus_call method_call = new getStatus_call(jobId, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class getStatus_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private String jobId;
+      public getStatus_call(String jobId, org.apache.thrift.async.AsyncMethodCallback<getStatus_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.jobId = jobId;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getStatus", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        getStatus_args args = new getStatus_args();
+        args.setJobId(jobId);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public String getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_getStatus();
+      }
+    }
+
   }
 
   public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -640,6 +699,7 @@ public class RemoteInterpreterService {
       processMap.put("getFormType", new getFormType());
       processMap.put("completion", new completion());
       processMap.put("shutdown", new shutdown());
+      processMap.put("getStatus", new getStatus());
       return processMap;
     }
 
@@ -824,6 +884,26 @@ public class RemoteInterpreterService {
       }
     }
 
+    public static class getStatus<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getStatus_args> {
+      public getStatus() {
+        super("getStatus");
+      }
+
+      public getStatus_args getEmptyArgsInstance() {
+        return new getStatus_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public getStatus_result getResult(I iface, getStatus_args args) throws org.apache.thrift.TException {
+        getStatus_result result = new getStatus_result();
+        result.success = iface.getStatus(args.jobId);
+        return result;
+      }
+    }
+
   }
 
   public static class createInterpreter_args implements org.apache.thrift.TBase<createInterpreter_args, createInterpreter_args._Fields>, java.io.Serializable, Cloneable   {
@@ -7383,4 +7463,712 @@ public class RemoteInterpreterService {
 
   }
 
+  public static class getStatus_args implements org.apache.thrift.TBase<getStatus_args, getStatus_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getStatus_args");
+
+    private static final org.apache.thrift.protocol.TField JOB_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("jobId", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new getStatus_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new getStatus_argsTupleSchemeFactory());
+    }
+
+    public String jobId; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      JOB_ID((short)1, "jobId");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // JOB_ID
+            return JOB_ID;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.JOB_ID, new org.apache.thrift.meta_data.FieldMetaData("jobId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getStatus_args.class, metaDataMap);
+    }
+
+    public getStatus_args() {
+    }
+
+    public getStatus_args(
+      String jobId)
+    {
+      this();
+      this.jobId = jobId;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public getStatus_args(getStatus_args other) {
+      if (other.isSetJobId()) {
+        this.jobId = other.jobId;
+      }
+    }
+
+    public getStatus_args deepCopy() {
+      return new getStatus_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.jobId = null;
+    }
+
+    public String getJobId() {
+      return this.jobId;
+    }
+
+    public getStatus_args setJobId(String jobId) {
+      this.jobId = jobId;
+      return this;
+    }
+
+    public void unsetJobId() {
+      this.jobId = null;
+    }
+
+    /** Returns true if field jobId is set (has been assigned a value) and false otherwise */
+    public boolean isSetJobId() {
+      return this.jobId != null;
+    }
+
+    public void setJobIdIsSet(boolean value) {
+      if (!value) {
+        this.jobId = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case JOB_ID:
+        if (value == null) {
+          unsetJobId();
+        } else {
+          setJobId((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case JOB_ID:
+        return getJobId();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case JOB_ID:
+        return isSetJobId();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof getStatus_args)
+        return this.equals((getStatus_args)that);
+      return false;
+    }
+
+    public boolean equals(getStatus_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_jobId = true && this.isSetJobId();
+      boolean that_present_jobId = true && that.isSetJobId();
+      if (this_present_jobId || that_present_jobId) {
+        if (!(this_present_jobId && that_present_jobId))
+          return false;
+        if (!this.jobId.equals(that.jobId))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(getStatus_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      getStatus_args typedOther = (getStatus_args)other;
+
+      lastComparison = Boolean.valueOf(isSetJobId()).compareTo(typedOther.isSetJobId());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetJobId()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jobId, typedOther.jobId);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("getStatus_args(");
+      boolean first = true;
+
+      sb.append("jobId:");
+      if (this.jobId == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.jobId);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class getStatus_argsStandardSchemeFactory implements SchemeFactory {
+      public getStatus_argsStandardScheme getScheme() {
+        return new getStatus_argsStandardScheme();
+      }
+    }
+
+    private static class getStatus_argsStandardScheme extends StandardScheme<getStatus_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, getStatus_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // JOB_ID
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.jobId = iprot.readString();
+                struct.setJobIdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, getStatus_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.jobId != null) {
+          oprot.writeFieldBegin(JOB_ID_FIELD_DESC);
+          oprot.writeString(struct.jobId);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class getStatus_argsTupleSchemeFactory implements SchemeFactory {
+      public getStatus_argsTupleScheme getScheme() {
+        return new getStatus_argsTupleScheme();
+      }
+    }
+
+    private static class getStatus_argsTupleScheme extends TupleScheme<getStatus_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, getStatus_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetJobId()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetJobId()) {
+          oprot.writeString(struct.jobId);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, getStatus_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.jobId = iprot.readString();
+          struct.setJobIdIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class getStatus_result implements org.apache.thrift.TBase<getStatus_result, getStatus_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getStatus_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new getStatus_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new getStatus_resultTupleSchemeFactory());
+    }
+
+    public String success; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getStatus_result.class, metaDataMap);
+    }
+
+    public getStatus_result() {
+    }
+
+    public getStatus_result(
+      String success)
+    {
+      this();
+      this.success = success;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public getStatus_result(getStatus_result other) {
+      if (other.isSetSuccess()) {
+        this.success = other.success;
+      }
+    }
+
+    public getStatus_result deepCopy() {
+      return new getStatus_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.success = null;
+    }
+
+    public String getSuccess() {
+      return this.success;
+    }
+
+    public getStatus_result setSuccess(String success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof getStatus_result)
+        return this.equals((getStatus_result)that);
+      return false;
+    }
+
+    public boolean equals(getStatus_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(getStatus_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      getStatus_result typedOther = (getStatus_result)other;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("getStatus_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class getStatus_resultStandardSchemeFactory implements SchemeFactory {
+      public getStatus_resultStandardScheme getScheme() {
+        return new getStatus_resultStandardScheme();
+      }
+    }
+
+    private static class getStatus_resultStandardScheme extends StandardScheme<getStatus_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, getStatus_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.success = iprot.readString();
+                struct.setSuccessIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, getStatus_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.success != null) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          oprot.writeString(struct.success);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class getStatus_resultTupleSchemeFactory implements SchemeFactory {
+      public getStatus_resultTupleScheme getScheme() {
+        return new getStatus_resultTupleScheme();
+      }
+    }
+
+    private static class getStatus_resultTupleScheme extends TupleScheme<getStatus_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, getStatus_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetSuccess()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetSuccess()) {
+          oprot.writeString(struct.success);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, getStatus_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.success = iprot.readString();
+          struct.setSuccessIsSet(true);
+        }
+      }
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
index 13b2355..078cd3c 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java
@@ -9,7 +9,7 @@ import com.nflabs.zeppelin.scheduler.Job.Status;
 
 /**
  * TODO(moon) : add description.
- * 
+ *
  * @author Leemoonsoo
  *
  */
@@ -27,10 +27,12 @@ public class FIFOScheduler implements Scheduler {
     this.listener = listener;
   }
 
+  @Override
   public String getName() {
     return name;
   }
 
+  @Override
   public Collection<Job> getJobsWaiting() {
     List<Job> ret = new LinkedList<Job>();
     synchronized (queue) {
@@ -41,6 +43,7 @@ public class FIFOScheduler implements Scheduler {
     return ret;
   }
 
+  @Override
   public Collection<Job> getJobsRunning() {
     List<Job> ret = new LinkedList<Job>();
     Job job = runningJob;
@@ -54,6 +57,7 @@ public class FIFOScheduler implements Scheduler {
 
 
 
+  @Override
   public void submit(Job job) {
     job.setStatus(Status.PENDING);
     synchronized (queue) {
@@ -62,6 +66,7 @@ public class FIFOScheduler implements Scheduler {
     }
   }
 
+  @Override
   public void run() {
 
     synchronized (queue) {
@@ -78,14 +83,36 @@ public class FIFOScheduler implements Scheduler {
 
         final Scheduler scheduler = this;
         this.executor.execute(new Runnable() {
+          @Override
           public void run() {
+            if (runningJob.isAborted()) {
+              runningJob.setStatus(Status.ABORT);
+              runningJob.aborted = false;
+              synchronized (queue) {
+                queue.notify();
+              }
+              return;
+            }
+
+            runningJob.setStatus(Status.RUNNING);
             if (listener != null) {
               listener.jobStarted(scheduler, runningJob);
             }
             runningJob.run();
+            if (runningJob.isAborted()) {
+              runningJob.setStatus(Status.ABORT);
+            } else {
+              if (runningJob.getException() != null) {
+                runningJob.setStatus(Status.ERROR);
+              } else {
+                runningJob.setStatus(Status.FINISHED);
+              }
+            }
             if (listener != null) {
               listener.jobFinished(scheduler, runningJob);
             }
+            // reset aborted flag to allow retry
+            runningJob.aborted = false;
             runningJob = null;
             synchronized (queue) {
               queue.notify();
@@ -96,6 +123,7 @@ public class FIFOScheduler implements Scheduler {
     }
   }
 
+  @Override
   public void stop() {
     terminate = true;
     synchronized (queue) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
index 0ca73f8..00465b3 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java
@@ -14,11 +14,11 @@ import org.slf4j.LoggerFactory;
  *  - should be run on a separate thread
  *  - maintains internal state: it's status
  *  - supports listeners who are updated on status change
- *  
+ *
  *  Job class is serialized/deserialized and used server<->client communication
  *  and saving/loading jobs from disk.
  *  Changing/adding/deleting non transitive field name need consideration of that.
- *  
+ *
  *  @author Leemoonsoo
  */
 public abstract class Job {
@@ -68,10 +68,6 @@ public abstract class Job {
   private transient JobListener listener;
   private long progressUpdateIntervalMs;
 
-  public Job(String jobName, JobListener listener) {
-    this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC);
-  }
-
   public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) {
     this.jobName = jobName;
     this.listener = listener;
@@ -84,14 +80,30 @@ public abstract class Job {
     setStatus(Status.READY);
   }
 
+  public Job(String jobName, JobListener listener) {
+    this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC);
+  }
+
+  public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) {
+    this.jobName = jobName;
+    this.listener = listener;
+    this.progressUpdateIntervalMs = progressUpdateIntervalMs;
+
+    id = jobId;
+
+    setStatus(Status.READY);
+  }
+
   public String getId() {
     return id;
   }
 
+  @Override
   public int hashCode() {
     return id.hashCode();
   }
 
+  @Override
   public boolean equals(Object o) {
     return ((Job) o).hashCode() == hashCode();
   }
@@ -132,26 +144,16 @@ public abstract class Job {
   }
 
   public void run() {
-    if (aborted) {
-      setStatus(Status.ABORT);
-      aborted = false;
-      return;
-    }
     JobProgressPoller progressUpdator = null;
     try {
-
-      setStatus(Status.RUNNING);
       progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs);
       progressUpdator.start();
       dateStarted = new Date();
       result = jobRun();
+      this.exception = null;
+      errorMessage = null;
       dateFinished = new Date();
       progressUpdator.terminate();
-      if (aborted) {
-        setStatus(Status.ABORT);
-      } else {
-        setStatus(Status.FINISHED);
-      }
     } catch (NullPointerException e) {
       logger().error("Job failed", e);
       progressUpdator.terminate();
@@ -159,7 +161,6 @@ public abstract class Job {
       result = e.getMessage();
       errorMessage = getStack(e);
       dateFinished = new Date();
-      setStatus(Status.ERROR);
     } catch (Throwable e) {
       logger().error("Job failed", e);
       progressUpdator.terminate();
@@ -167,9 +168,8 @@ public abstract class Job {
       result = e.getMessage();
       errorMessage = getStack(e);
       dateFinished = new Date();
-      setStatus(Status.ERROR);
     } finally {
-      aborted = false;
+      //aborted = false;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
index 39bb3f7..d28e125 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java
@@ -9,7 +9,7 @@ import com.nflabs.zeppelin.scheduler.Job.Status;
 
 /**
  * TODO(moon) : add description.
- * 
+ *
  * @author Leemoonsoo
  *
  */
@@ -30,10 +30,12 @@ public class ParallelScheduler implements Scheduler {
     this.maxConcurrency = maxConcurrency;
   }
 
+  @Override
   public String getName() {
     return name;
   }
 
+  @Override
   public Collection<Job> getJobsWaiting() {
     List<Job> ret = new LinkedList<Job>();
     synchronized (queue) {
@@ -44,6 +46,7 @@ public class ParallelScheduler implements Scheduler {
     return ret;
   }
 
+  @Override
   public Collection<Job> getJobsRunning() {
     List<Job> ret = new LinkedList<Job>();
     synchronized (queue) {
@@ -56,6 +59,7 @@ public class ParallelScheduler implements Scheduler {
 
 
 
+  @Override
   public void submit(Job job) {
     job.setStatus(Status.PENDING);
     synchronized (queue) {
@@ -64,6 +68,7 @@ public class ParallelScheduler implements Scheduler {
     }
   }
 
+  @Override
   public void run() {
 
     synchronized (queue) {
@@ -103,14 +108,41 @@ public class ParallelScheduler implements Scheduler {
       this.job = job;
     }
 
+    @Override
     public void run() {
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+        job.aborted = false;
+
+        synchronized (queue) {
+          running.remove(job);
+          queue.notify();
+        }
+
+        return;
+      }
+
+      job.setStatus(Status.RUNNING);
       if (listener != null) {
         listener.jobStarted(scheduler, job);
       }
       job.run();
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+      } else {
+        if (job.getException() != null) {
+          job.setStatus(Status.ERROR);
+        } else {
+          job.setStatus(Status.FINISHED);
+        }
+      }
+
       if (listener != null) {
         listener.jobFinished(scheduler, job);
       }
+
+      // reset aborted flag to allow retry
+      job.aborted = false;
       synchronized (queue) {
         running.remove(job);
         queue.notify();
@@ -119,6 +151,7 @@ public class ParallelScheduler implements Scheduler {
   }
 
 
+  @Override
   public void stop() {
     terminate = true;
     synchronized (queue) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
new file mode 100644
index 0000000..bb02f13
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
@@ -0,0 +1,286 @@
+package com.nflabs.zeppelin.scheduler;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import com.nflabs.zeppelin.scheduler.Job.Status;
+
+/**
+ *
+ */
+public class RemoteScheduler implements Scheduler {
+  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
+
+  List<Job> queue = new LinkedList<Job>();
+  List<Job> running = new LinkedList<Job>();
+  private ExecutorService executor;
+  private SchedulerListener listener;
+  boolean terminate = false;
+  private String name;
+  private int maxConcurrency;
+  private RemoteInterpreterProcess interpreterProcess;
+
+  public RemoteScheduler(String name,
+      ExecutorService executor,
+      RemoteInterpreterProcess interpreterProcess,
+      SchedulerListener listener,
+      int maxConcurrency) {
+    this.name = name;
+    this.executor = executor;
+    this.listener = listener;
+    this.interpreterProcess = interpreterProcess;
+    this.maxConcurrency = maxConcurrency;
+  }
+
+  @Override
+  public void run() {
+    synchronized (queue) {
+      while (terminate == false) {
+        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+          }
+          continue;
+        }
+
+
+        Job job = queue.remove(0);
+        running.add(job);
+
+        // run and
+        Scheduler scheduler = this;
+        executor.execute(new JobRunner(scheduler, job));
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Collection<Job> getJobsWaiting() {
+    return null;
+  }
+
+  @Override
+  public Collection<Job> getJobsRunning() {
+    return null;
+  }
+
+  @Override
+  public void submit(Job job) {
+    job.setStatus(Status.PENDING);
+
+    synchronized (queue) {
+      queue.add(job);
+      queue.notify();
+    }
+  }
+
+  public void setMaxConcurrency(int maxConcurrency) {
+    this.maxConcurrency = maxConcurrency;
+    synchronized (queue) {
+      queue.notify();
+    }
+  }
+
+  /**
+   * Role of the class is get status info from remote process
+   * from PENDING to RUNNING status.
+   */
+  private class JobStatusPoller extends Thread {
+    private long initialPeriodMsec;
+    private long initialPeriodCheckIntervalMsec;
+    private long checkIntervalMsec;
+    private boolean terminate;
+    private JobListener listener;
+    private Job job;
+
+    public JobStatusPoller(
+        long initialPeriodMsec,
+        long initialPeriodCheckIntervalMsec,
+        long checkIntervalMsec,
+        Job job,
+        JobListener listener
+    ) {
+      this.initialPeriodMsec = initialPeriodMsec;
+      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
+      this.checkIntervalMsec = checkIntervalMsec;
+      this.job = job;
+      this.listener = listener;
+      this.terminate = false;
+    }
+
+    @Override
+    public void run() {
+      long started = System.currentTimeMillis();
+      while (terminate == false) {
+        long current = System.currentTimeMillis();
+        long interval;
+        if (current - started < initialPeriodMsec) {
+          interval = initialPeriodCheckIntervalMsec;
+        } else {
+          interval = checkIntervalMsec;
+        }
+
+        synchronized (this) {
+          try {
+            this.wait(interval);
+          } catch (InterruptedException e) {
+          }
+        }
+
+        Status newStatus = getStatus();
+        if (newStatus == null) {
+          continue;
+        }
+
+        // update only RUNNING
+        if (newStatus == Status.RUNNING) {
+          listener.afterStatusChange(job, null, newStatus);
+          break;
+        }
+
+        if (newStatus != Status.READY &&
+            newStatus != Status.PENDING) {
+          // we don't need more
+          continue;
+        }
+      }
+    }
+
+    public void shutdown() {
+      terminate = true;
+      synchronized (this) {
+        this.notify();
+      }
+    }
+
+    public synchronized Job.Status getStatus() {
+      if (interpreterProcess.referenceCount() <= 0) {
+        return null;
+      }
+
+      Client client;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e) {
+        logger.error("Can't get status information", e);
+        return Status.FINISHED;
+      }
+
+      try {
+        Status status = Status.valueOf(client.getStatus(job.getId()));
+        logger.info("getStatus from remote {}", status);
+        return status;
+      } catch (TException e) {
+        logger.error("Can't get status information", e);
+        return Status.FINISHED;
+      } catch (Exception e) {
+        // unknown status
+        return Status.FINISHED;
+      } finally {
+        interpreterProcess.releaseClient(client);
+      }
+    }
+  }
+
+
+  private class JobRunner implements Runnable, JobListener {
+    private Scheduler scheduler;
+    private Job job;
+
+    public JobRunner(Scheduler scheduler, Job job) {
+      this.scheduler = scheduler;
+      this.job = job;
+    }
+
+    @Override
+    public void run() {
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+        job.aborted = false;
+
+        synchronized (queue) {
+          running.remove(job);
+          queue.notify();
+        }
+
+        return;
+      }
+
+
+      JobStatusPoller jobStatusPoller = new JobStatusPoller(
+          1500,
+          100,
+          500,
+          job,
+          this
+      );
+      logger.info("*********** Start job status poller");
+      jobStatusPoller.start();
+
+      if (listener != null) {
+        listener.jobStarted(scheduler, job);
+      }
+      job.run();
+
+      jobStatusPoller.shutdown();
+
+      job.setStatus(jobStatusPoller.getStatus());
+
+      if (listener != null) {
+        listener.jobFinished(scheduler, job);
+      }
+
+      // reset aborted flag to allow retry
+      job.aborted = false;
+
+      synchronized (queue) {
+        running.remove(job);
+        queue.notify();
+      }
+    }
+
+    @Override
+    public void onProgressUpdate(Job job, int progress) {
+    }
+
+    @Override
+    public void beforeStatusChange(Job job, Status before, Status after) {
+    }
+
+    @Override
+    public void afterStatusChange(Job job, Status before, Status after) {
+      // status polled by status poller
+      if (job.getStatus() == after) {
+        return;
+      }
+
+      job.setStatus(after);
+    }
+  }
+
+
+  @Override
+  public void stop() {
+    terminate = true;
+    synchronized (queue) {
+      queue.notify();
+    }
+
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
index c79f8e2..115e2b1 100644
--- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java
@@ -11,9 +11,11 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+
 /**
  * TODO(moon) : add description.
- * 
+ *
  * @author Leemoonsoo
  *
  */
@@ -70,6 +72,26 @@ public class SchedulerFactory implements SchedulerListener {
     }
   }
 
+  public Scheduler createOrGetRemoteScheduler(
+      String name,
+      RemoteInterpreterProcess interpreterProcess,
+      int maxConcurrency) {
+
+    synchronized (schedulers) {
+      if (schedulers.containsKey(name) == false) {
+        Scheduler s = new RemoteScheduler(
+            name,
+            executor,
+            interpreterProcess,
+            this,
+            maxConcurrency);
+        schedulers.put(name, s);
+        executor.execute(s);
+      }
+      return schedulers.get(name);
+    }
+  }
+
   public Scheduler removeScheduler(String name) {
     synchronized (schedulers) {
       Scheduler s = schedulers.remove(name);
@@ -90,11 +112,13 @@ public class SchedulerFactory implements SchedulerListener {
     return s;
   }
 
+  @Override
   public void jobStarted(Scheduler scheduler, Job job) {
     logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
 
   }
 
+  @Override
   public void jobFinished(Scheduler scheduler, Job job) {
     logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 53bda30..bbb54b1 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -27,6 +27,7 @@ service RemoteInterpreterService {
   i32 getProgress(1: string className, 2: RemoteInterpreterContext interpreterContext);
   string getFormType(1: string className);
   list<string> completion(1: string className, 2: string buf, 3: i32 cursor);
-
   void shutdown();
+
+  string getStatus(1:string jobId);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
index 0641aca..37a29d1 100644
--- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java
@@ -4,7 +4,7 @@ import junit.framework.TestCase;
 
 import com.nflabs.zeppelin.scheduler.Job.Status;
 
-public class FIFOSchedulerTest extends TestCase{
+public class FIFOSchedulerTest extends TestCase {
 
 	private SchedulerFactory schedulerSvc;
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
new file mode 100644
index 0000000..8695038
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -0,0 +1,90 @@
+package com.nflabs.zeppelin.scheduler;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.nflabs.zeppelin.display.GUI;
+import com.nflabs.zeppelin.interpreter.InterpreterContext;
+import com.nflabs.zeppelin.interpreter.InterpreterGroup;
+import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreter;
+import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA;
+
+public class RemoteSchedulerTest {
+
+  private SchedulerFactory schedulerSvc;
+
+  @Before
+  public void setUp() throws Exception{
+    schedulerSvc = new SchedulerFactory();
+  }
+
+  @After
+  public void tearDown(){
+
+  }
+
+  @Test
+  public void test() throws Exception {
+    Properties p = new Properties();
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env
+        );
+
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
+        intpA.getInterpreterProcess(),
+        10);
+
+    scheduler.submit(new Job("jobName", null) {
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("500", new InterpreterContext(
+            "id",
+            "title",
+            "text",
+            new HashMap<String, Object>(),
+            new GUI()));
+        return "500";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    });
+
+    intpA.close();
+    schedulerSvc.removeScheduler("test");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
index d86f860..f38dc65 100644
--- a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
@@ -272,7 +272,7 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
         notebook.refreshCron(note.id());
       }
       note.persist();
-      
+
       broadcastNote(note);
       broadcastNoteList();
     }


Mime
View raw message