Return-Path: X-Original-To: apmail-zeppelin-commits-archive@minotaur.apache.org Delivered-To: apmail-zeppelin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 86B7F176D7 for ; Fri, 20 Mar 2015 18:02:14 +0000 (UTC) Received: (qmail 17634 invoked by uid 500); 20 Mar 2015 18:02:08 -0000 Delivered-To: apmail-zeppelin-commits-archive@zeppelin.apache.org Received: (qmail 17603 invoked by uid 500); 20 Mar 2015 18:02:08 -0000 Mailing-List: contact commits-help@zeppelin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.incubator.apache.org Delivered-To: mailing list commits@zeppelin.incubator.apache.org Received: (qmail 17594 invoked by uid 99); 20 Mar 2015 18:02:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 18:02:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 20 Mar 2015 18:01:28 +0000 Received: (qmail 12553 invoked by uid 99); 20 Mar 2015 18:01:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 18:01:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C9D11E1948; Fri, 20 Mar 2015 18:01:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: moon@apache.org To: commits@zeppelin.incubator.apache.org Date: Fri, 20 Mar 2015 18:01:24 -0000 Message-Id: <798477ca7fb947c894ffa40596d35f66@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/50] incubator-zeppelin git commit: Implement RemoteScheduler X-Virus-Checked: Checked by ClamAV on apache.org Implement RemoteScheduler Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/7d3f55de Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/7d3f55de Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/7d3f55de Branch: refs/heads/master Commit: 7d3f55defd3acce5c3d98a3366766d78e908aaec Parents: 3490f2e Author: Lee moon soo Authored: Wed Mar 11 15:08:29 2015 +0900 Committer: Lee moon soo Committed: Wed Mar 11 15:08:29 2015 +0900 ---------------------------------------------------------------------- .../interpreter/LazyOpenInterpreter.java | 7 +- .../interpreter/remote/RemoteInterpreter.java | 6 +- .../remote/RemoteInterpreterServer.java | 28 +- .../thrift/RemoteInterpreterService.java | 788 +++++++++++++++++++ .../zeppelin/scheduler/FIFOScheduler.java | 30 +- .../java/com/nflabs/zeppelin/scheduler/Job.java | 42 +- .../zeppelin/scheduler/ParallelScheduler.java | 35 +- .../zeppelin/scheduler/RemoteScheduler.java | 286 +++++++ .../zeppelin/scheduler/SchedulerFactory.java | 26 +- .../main/thrift/RemoteInterpreterService.thrift | 3 +- .../zeppelin/scheduler/FIFOSchedulerTest.java | 2 +- .../zeppelin/scheduler/RemoteSchedulerTest.java | 90 +++ .../nflabs/zeppelin/socket/NotebookServer.java | 2 +- 13 files changed, 1311 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java index 0d09bc1..0703320 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java @@ -12,7 +12,6 @@ import com.nflabs.zeppelin.scheduler.Scheduler; public class LazyOpenInterpreter extends Interpreter implements WrappedInterpreter { - private Interpreter intp; boolean opened = false; @@ -47,7 +46,7 @@ public class LazyOpenInterpreter return; } - synchronized (this) { + synchronized (intp) { if (opened == false) { intp.open(); opened = true; @@ -57,7 +56,7 @@ public class LazyOpenInterpreter @Override public void close() { - synchronized (this) { + synchronized (intp) { if (opened == true) { intp.close(); opened = false; @@ -66,7 +65,7 @@ public class LazyOpenInterpreter } public boolean isOpen() { - synchronized (this) { + synchronized (intp) { return opened; } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java index 7d7b801..7d98a48 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -291,8 +291,10 @@ public class RemoteInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetParallelScheduler( - "remoteinterpreter_" + this.hashCode(), 10); + int maxConcurrency = 10; + + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + "remoteinterpreter_" + this.hashCode(), getInterpreterProcess(), maxConcurrency); } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java index d81a657..8ab13a4 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -31,6 +31,7 @@ import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService; import com.nflabs.zeppelin.scheduler.Job; import com.nflabs.zeppelin.scheduler.Job.Status; import com.nflabs.zeppelin.scheduler.JobListener; +import com.nflabs.zeppelin.scheduler.JobProgressPoller; import com.nflabs.zeppelin.scheduler.Scheduler; @@ -158,8 +159,10 @@ public class RemoteInterpreterServer Scheduler scheduler = intp.getScheduler(); InterpretJobListener jobListener = new InterpretJobListener(); InterpretJob job = new InterpretJob( + interpreterContext.getParagraphId(), "remoteInterpretJob_" + System.currentTimeMillis(), jobListener, + JobProgressPoller.DEFAULT_INTERVAL_MSEC, intp, st, context); @@ -214,12 +217,14 @@ public class RemoteInterpreterServer private InterpreterContext context; public InterpretJob( + String jobId, String jobName, JobListener listener, + long progressUpdateIntervalMsec, Interpreter interpreter, String script, InterpreterContext context) { - super(jobName, listener); + super(jobId, jobName, listener, progressUpdateIntervalMsec); this.interpreter = interpreter; this.script = script; this.context = context; @@ -294,4 +299,25 @@ public class RemoteInterpreterServer gson.toJson(config), gson.toJson(gui)); } + + @Override + public String getStatus(String jobId) + throws TException { + synchronized (interpreterGroup) { + for (Interpreter intp : interpreterGroup) { + for (Job job : intp.getScheduler().getJobsRunning()) { + if (jobId.equals(job.getId())) { + return job.getStatus().name(); + } + } + + for (Job job : intp.getScheduler().getJobsWaiting()) { + if (jobId.equals(job.getId())) { + return job.getStatus().name(); + } + } + } + } + return "Unknown"; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java index b07af47..eed35c4 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -52,6 +52,8 @@ public class RemoteInterpreterService { public void shutdown() throws org.apache.thrift.TException; + public String getStatus(String jobId) throws org.apache.thrift.TException; + } public interface AsyncIface { @@ -74,6 +76,8 @@ public class RemoteInterpreterService { public void shutdown(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void getStatus(String jobId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + } public static class Client extends org.apache.thrift.TServiceClient implements Iface { @@ -294,6 +298,29 @@ public class RemoteInterpreterService { return; } + public String getStatus(String jobId) throws org.apache.thrift.TException + { + send_getStatus(jobId); + return recv_getStatus(); + } + + public void send_getStatus(String jobId) throws org.apache.thrift.TException + { + getStatus_args args = new getStatus_args(); + args.setJobId(jobId); + sendBase("getStatus", args); + } + + public String recv_getStatus() throws org.apache.thrift.TException + { + getStatus_result result = new getStatus_result(); + receiveBase(result, "getStatus"); + if (result.isSetSuccess()) { + return result.success; + } + throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getStatus failed: unknown result"); + } + } public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory { @@ -618,6 +645,38 @@ public class RemoteInterpreterService { } } + public void getStatus(String jobId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + getStatus_call method_call = new getStatus_call(jobId, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class getStatus_call extends org.apache.thrift.async.TAsyncMethodCall { + private String jobId; + public getStatus_call(String jobId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.jobId = jobId; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getStatus", org.apache.thrift.protocol.TMessageType.CALL, 0)); + getStatus_args args = new getStatus_args(); + args.setJobId(jobId); + args.write(prot); + prot.writeMessageEnd(); + } + + public String getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_getStatus(); + } + } + } public static class Processor extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor { @@ -640,6 +699,7 @@ public class RemoteInterpreterService { processMap.put("getFormType", new getFormType()); processMap.put("completion", new completion()); processMap.put("shutdown", new shutdown()); + processMap.put("getStatus", new getStatus()); return processMap; } @@ -824,6 +884,26 @@ public class RemoteInterpreterService { } } + public static class getStatus extends org.apache.thrift.ProcessFunction { + public getStatus() { + super("getStatus"); + } + + public getStatus_args getEmptyArgsInstance() { + return new getStatus_args(); + } + + protected boolean isOneway() { + return false; + } + + public getStatus_result getResult(I iface, getStatus_args args) throws org.apache.thrift.TException { + getStatus_result result = new getStatus_result(); + result.success = iface.getStatus(args.jobId); + return result; + } + } + } public static class createInterpreter_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { @@ -7383,4 +7463,712 @@ public class RemoteInterpreterService { } + public static class getStatus_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getStatus_args"); + + private static final org.apache.thrift.protocol.TField JOB_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("jobId", org.apache.thrift.protocol.TType.STRING, (short)1); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new getStatus_argsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new getStatus_argsTupleSchemeFactory()); + } + + public String jobId; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + JOB_ID((short)1, "jobId"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // JOB_ID + return JOB_ID; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.JOB_ID, new org.apache.thrift.meta_data.FieldMetaData("jobId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getStatus_args.class, metaDataMap); + } + + public getStatus_args() { + } + + public getStatus_args( + String jobId) + { + this(); + this.jobId = jobId; + } + + /** + * Performs a deep copy on other. + */ + public getStatus_args(getStatus_args other) { + if (other.isSetJobId()) { + this.jobId = other.jobId; + } + } + + public getStatus_args deepCopy() { + return new getStatus_args(this); + } + + @Override + public void clear() { + this.jobId = null; + } + + public String getJobId() { + return this.jobId; + } + + public getStatus_args setJobId(String jobId) { + this.jobId = jobId; + return this; + } + + public void unsetJobId() { + this.jobId = null; + } + + /** Returns true if field jobId is set (has been assigned a value) and false otherwise */ + public boolean isSetJobId() { + return this.jobId != null; + } + + public void setJobIdIsSet(boolean value) { + if (!value) { + this.jobId = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case JOB_ID: + if (value == null) { + unsetJobId(); + } else { + setJobId((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case JOB_ID: + return getJobId(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case JOB_ID: + return isSetJobId(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof getStatus_args) + return this.equals((getStatus_args)that); + return false; + } + + public boolean equals(getStatus_args that) { + if (that == null) + return false; + + boolean this_present_jobId = true && this.isSetJobId(); + boolean that_present_jobId = true && that.isSetJobId(); + if (this_present_jobId || that_present_jobId) { + if (!(this_present_jobId && that_present_jobId)) + return false; + if (!this.jobId.equals(that.jobId)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(getStatus_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + getStatus_args typedOther = (getStatus_args)other; + + lastComparison = Boolean.valueOf(isSetJobId()).compareTo(typedOther.isSetJobId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetJobId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jobId, typedOther.jobId); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("getStatus_args("); + boolean first = true; + + sb.append("jobId:"); + if (this.jobId == null) { + sb.append("null"); + } else { + sb.append(this.jobId); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class getStatus_argsStandardSchemeFactory implements SchemeFactory { + public getStatus_argsStandardScheme getScheme() { + return new getStatus_argsStandardScheme(); + } + } + + private static class getStatus_argsStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, getStatus_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // JOB_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.jobId = iprot.readString(); + struct.setJobIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, getStatus_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.jobId != null) { + oprot.writeFieldBegin(JOB_ID_FIELD_DESC); + oprot.writeString(struct.jobId); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class getStatus_argsTupleSchemeFactory implements SchemeFactory { + public getStatus_argsTupleScheme getScheme() { + return new getStatus_argsTupleScheme(); + } + } + + private static class getStatus_argsTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, getStatus_args struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetJobId()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetJobId()) { + oprot.writeString(struct.jobId); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, getStatus_args struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.jobId = iprot.readString(); + struct.setJobIdIsSet(true); + } + } + } + + } + + public static class getStatus_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getStatus_result"); + + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0); + + private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new getStatus_resultStandardSchemeFactory()); + schemes.put(TupleScheme.class, new getStatus_resultTupleSchemeFactory()); + } + + public String success; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + SUCCESS((short)0, "success"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getStatus_result.class, metaDataMap); + } + + public getStatus_result() { + } + + public getStatus_result( + String success) + { + this(); + this.success = success; + } + + /** + * Performs a deep copy on other. + */ + public getStatus_result(getStatus_result other) { + if (other.isSetSuccess()) { + this.success = other.success; + } + } + + public getStatus_result deepCopy() { + return new getStatus_result(this); + } + + @Override + public void clear() { + this.success = null; + } + + public String getSuccess() { + return this.success; + } + + public getStatus_result setSuccess(String success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return isSetSuccess(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof getStatus_result) + return this.equals((getStatus_result)that); + return false; + } + + public boolean equals(getStatus_result that) { + if (that == null) + return false; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(getStatus_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + getStatus_result typedOther = (getStatus_result)other; + + lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, typedOther.success); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("getStatus_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class getStatus_resultStandardSchemeFactory implements SchemeFactory { + public getStatus_resultStandardScheme getScheme() { + return new getStatus_resultStandardScheme(); + } + } + + private static class getStatus_resultStandardScheme extends StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, getStatus_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 0: // SUCCESS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.success = iprot.readString(); + struct.setSuccessIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, getStatus_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.success != null) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + oprot.writeString(struct.success); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class getStatus_resultTupleSchemeFactory implements SchemeFactory { + public getStatus_resultTupleScheme getScheme() { + return new getStatus_resultTupleScheme(); + } + } + + private static class getStatus_resultTupleScheme extends TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, getStatus_result struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetSuccess()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetSuccess()) { + oprot.writeString(struct.success); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, getStatus_result struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.success = iprot.readString(); + struct.setSuccessIsSet(true); + } + } + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java index 13b2355..078cd3c 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/FIFOScheduler.java @@ -9,7 +9,7 @@ import com.nflabs.zeppelin.scheduler.Job.Status; /** * TODO(moon) : add description. - * + * * @author Leemoonsoo * */ @@ -27,10 +27,12 @@ public class FIFOScheduler implements Scheduler { this.listener = listener; } + @Override public String getName() { return name; } + @Override public Collection getJobsWaiting() { List ret = new LinkedList(); synchronized (queue) { @@ -41,6 +43,7 @@ public class FIFOScheduler implements Scheduler { return ret; } + @Override public Collection getJobsRunning() { List ret = new LinkedList(); Job job = runningJob; @@ -54,6 +57,7 @@ public class FIFOScheduler implements Scheduler { + @Override public void submit(Job job) { job.setStatus(Status.PENDING); synchronized (queue) { @@ -62,6 +66,7 @@ public class FIFOScheduler implements Scheduler { } } + @Override public void run() { synchronized (queue) { @@ -78,14 +83,36 @@ public class FIFOScheduler implements Scheduler { final Scheduler scheduler = this; this.executor.execute(new Runnable() { + @Override public void run() { + if (runningJob.isAborted()) { + runningJob.setStatus(Status.ABORT); + runningJob.aborted = false; + synchronized (queue) { + queue.notify(); + } + return; + } + + runningJob.setStatus(Status.RUNNING); if (listener != null) { listener.jobStarted(scheduler, runningJob); } runningJob.run(); + if (runningJob.isAborted()) { + runningJob.setStatus(Status.ABORT); + } else { + if (runningJob.getException() != null) { + runningJob.setStatus(Status.ERROR); + } else { + runningJob.setStatus(Status.FINISHED); + } + } if (listener != null) { listener.jobFinished(scheduler, runningJob); } + // reset aborted flag to allow retry + runningJob.aborted = false; runningJob = null; synchronized (queue) { queue.notify(); @@ -96,6 +123,7 @@ public class FIFOScheduler implements Scheduler { } } + @Override public void stop() { terminate = true; synchronized (queue) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java index 0ca73f8..00465b3 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/Job.java @@ -14,11 +14,11 @@ import org.slf4j.LoggerFactory; * - should be run on a separate thread * - maintains internal state: it's status * - supports listeners who are updated on status change - * + * * Job class is serialized/deserialized and used server<->client communication * and saving/loading jobs from disk. * Changing/adding/deleting non transitive field name need consideration of that. - * + * * @author Leemoonsoo */ public abstract class Job { @@ -68,10 +68,6 @@ public abstract class Job { private transient JobListener listener; private long progressUpdateIntervalMs; - public Job(String jobName, JobListener listener) { - this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC); - } - public Job(String jobName, JobListener listener, long progressUpdateIntervalMs) { this.jobName = jobName; this.listener = listener; @@ -84,14 +80,30 @@ public abstract class Job { setStatus(Status.READY); } + public Job(String jobName, JobListener listener) { + this(jobName, listener, JobProgressPoller.DEFAULT_INTERVAL_MSEC); + } + + public Job(String jobId, String jobName, JobListener listener, long progressUpdateIntervalMs) { + this.jobName = jobName; + this.listener = listener; + this.progressUpdateIntervalMs = progressUpdateIntervalMs; + + id = jobId; + + setStatus(Status.READY); + } + public String getId() { return id; } + @Override public int hashCode() { return id.hashCode(); } + @Override public boolean equals(Object o) { return ((Job) o).hashCode() == hashCode(); } @@ -132,26 +144,16 @@ public abstract class Job { } public void run() { - if (aborted) { - setStatus(Status.ABORT); - aborted = false; - return; - } JobProgressPoller progressUpdator = null; try { - - setStatus(Status.RUNNING); progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs); progressUpdator.start(); dateStarted = new Date(); result = jobRun(); + this.exception = null; + errorMessage = null; dateFinished = new Date(); progressUpdator.terminate(); - if (aborted) { - setStatus(Status.ABORT); - } else { - setStatus(Status.FINISHED); - } } catch (NullPointerException e) { logger().error("Job failed", e); progressUpdator.terminate(); @@ -159,7 +161,6 @@ public abstract class Job { result = e.getMessage(); errorMessage = getStack(e); dateFinished = new Date(); - setStatus(Status.ERROR); } catch (Throwable e) { logger().error("Job failed", e); progressUpdator.terminate(); @@ -167,9 +168,8 @@ public abstract class Job { result = e.getMessage(); errorMessage = getStack(e); dateFinished = new Date(); - setStatus(Status.ERROR); } finally { - aborted = false; + //aborted = false; } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java index 39bb3f7..d28e125 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/ParallelScheduler.java @@ -9,7 +9,7 @@ import com.nflabs.zeppelin.scheduler.Job.Status; /** * TODO(moon) : add description. - * + * * @author Leemoonsoo * */ @@ -30,10 +30,12 @@ public class ParallelScheduler implements Scheduler { this.maxConcurrency = maxConcurrency; } + @Override public String getName() { return name; } + @Override public Collection getJobsWaiting() { List ret = new LinkedList(); synchronized (queue) { @@ -44,6 +46,7 @@ public class ParallelScheduler implements Scheduler { return ret; } + @Override public Collection getJobsRunning() { List ret = new LinkedList(); synchronized (queue) { @@ -56,6 +59,7 @@ public class ParallelScheduler implements Scheduler { + @Override public void submit(Job job) { job.setStatus(Status.PENDING); synchronized (queue) { @@ -64,6 +68,7 @@ public class ParallelScheduler implements Scheduler { } } + @Override public void run() { synchronized (queue) { @@ -103,14 +108,41 @@ public class ParallelScheduler implements Scheduler { this.job = job; } + @Override public void run() { + if (job.isAborted()) { + job.setStatus(Status.ABORT); + job.aborted = false; + + synchronized (queue) { + running.remove(job); + queue.notify(); + } + + return; + } + + job.setStatus(Status.RUNNING); if (listener != null) { listener.jobStarted(scheduler, job); } job.run(); + if (job.isAborted()) { + job.setStatus(Status.ABORT); + } else { + if (job.getException() != null) { + job.setStatus(Status.ERROR); + } else { + job.setStatus(Status.FINISHED); + } + } + if (listener != null) { listener.jobFinished(scheduler, job); } + + // reset aborted flag to allow retry + job.aborted = false; synchronized (queue) { running.remove(job); queue.notify(); @@ -119,6 +151,7 @@ public class ParallelScheduler implements Scheduler { } + @Override public void stop() { terminate = true; synchronized (queue) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java new file mode 100644 index 0000000..bb02f13 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java @@ -0,0 +1,286 @@ +package com.nflabs.zeppelin.scheduler; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import com.nflabs.zeppelin.scheduler.Job.Status; + +/** + * + */ +public class RemoteScheduler implements Scheduler { + Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); + + List queue = new LinkedList(); + List running = new LinkedList(); + private ExecutorService executor; + private SchedulerListener listener; + boolean terminate = false; + private String name; + private int maxConcurrency; + private RemoteInterpreterProcess interpreterProcess; + + public RemoteScheduler(String name, + ExecutorService executor, + RemoteInterpreterProcess interpreterProcess, + SchedulerListener listener, + int maxConcurrency) { + this.name = name; + this.executor = executor; + this.listener = listener; + this.interpreterProcess = interpreterProcess; + this.maxConcurrency = maxConcurrency; + } + + @Override + public void run() { + synchronized (queue) { + while (terminate == false) { + if (running.size() >= maxConcurrency || queue.isEmpty() == true) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + continue; + } + + + Job job = queue.remove(0); + running.add(job); + + // run and + Scheduler scheduler = this; + executor.execute(new JobRunner(scheduler, job)); + } + } + } + + @Override + public String getName() { + return name; + } + + @Override + public Collection getJobsWaiting() { + return null; + } + + @Override + public Collection getJobsRunning() { + return null; + } + + @Override + public void submit(Job job) { + job.setStatus(Status.PENDING); + + synchronized (queue) { + queue.add(job); + queue.notify(); + } + } + + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + synchronized (queue) { + queue.notify(); + } + } + + /** + * Role of the class is get status info from remote process + * from PENDING to RUNNING status. + */ + private class JobStatusPoller extends Thread { + private long initialPeriodMsec; + private long initialPeriodCheckIntervalMsec; + private long checkIntervalMsec; + private boolean terminate; + private JobListener listener; + private Job job; + + public JobStatusPoller( + long initialPeriodMsec, + long initialPeriodCheckIntervalMsec, + long checkIntervalMsec, + Job job, + JobListener listener + ) { + this.initialPeriodMsec = initialPeriodMsec; + this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec; + this.checkIntervalMsec = checkIntervalMsec; + this.job = job; + this.listener = listener; + this.terminate = false; + } + + @Override + public void run() { + long started = System.currentTimeMillis(); + while (terminate == false) { + long current = System.currentTimeMillis(); + long interval; + if (current - started < initialPeriodMsec) { + interval = initialPeriodCheckIntervalMsec; + } else { + interval = checkIntervalMsec; + } + + synchronized (this) { + try { + this.wait(interval); + } catch (InterruptedException e) { + } + } + + Status newStatus = getStatus(); + if (newStatus == null) { + continue; + } + + // update only RUNNING + if (newStatus == Status.RUNNING) { + listener.afterStatusChange(job, null, newStatus); + break; + } + + if (newStatus != Status.READY && + newStatus != Status.PENDING) { + // we don't need more + continue; + } + } + } + + public void shutdown() { + terminate = true; + synchronized (this) { + this.notify(); + } + } + + public synchronized Job.Status getStatus() { + if (interpreterProcess.referenceCount() <= 0) { + return null; + } + + Client client; + try { + client = interpreterProcess.getClient(); + } catch (Exception e) { + logger.error("Can't get status information", e); + return Status.FINISHED; + } + + try { + Status status = Status.valueOf(client.getStatus(job.getId())); + logger.info("getStatus from remote {}", status); + return status; + } catch (TException e) { + logger.error("Can't get status information", e); + return Status.FINISHED; + } catch (Exception e) { + // unknown status + return Status.FINISHED; + } finally { + interpreterProcess.releaseClient(client); + } + } + } + + + private class JobRunner implements Runnable, JobListener { + private Scheduler scheduler; + private Job job; + + public JobRunner(Scheduler scheduler, Job job) { + this.scheduler = scheduler; + this.job = job; + } + + @Override + public void run() { + if (job.isAborted()) { + job.setStatus(Status.ABORT); + job.aborted = false; + + synchronized (queue) { + running.remove(job); + queue.notify(); + } + + return; + } + + + JobStatusPoller jobStatusPoller = new JobStatusPoller( + 1500, + 100, + 500, + job, + this + ); + logger.info("*********** Start job status poller"); + jobStatusPoller.start(); + + if (listener != null) { + listener.jobStarted(scheduler, job); + } + job.run(); + + jobStatusPoller.shutdown(); + + job.setStatus(jobStatusPoller.getStatus()); + + if (listener != null) { + listener.jobFinished(scheduler, job); + } + + // reset aborted flag to allow retry + job.aborted = false; + + synchronized (queue) { + running.remove(job); + queue.notify(); + } + } + + @Override + public void onProgressUpdate(Job job, int progress) { + } + + @Override + public void beforeStatusChange(Job job, Status before, Status after) { + } + + @Override + public void afterStatusChange(Job job, Status before, Status after) { + // status polled by status poller + if (job.getStatus() == after) { + return; + } + + job.setStatus(after); + } + } + + + @Override + public void stop() { + terminate = true; + synchronized (queue) { + queue.notify(); + } + + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java index c79f8e2..115e2b1 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/SchedulerFactory.java @@ -11,9 +11,11 @@ import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterProcess; + /** * TODO(moon) : add description. - * + * * @author Leemoonsoo * */ @@ -70,6 +72,26 @@ public class SchedulerFactory implements SchedulerListener { } } + public Scheduler createOrGetRemoteScheduler( + String name, + RemoteInterpreterProcess interpreterProcess, + int maxConcurrency) { + + synchronized (schedulers) { + if (schedulers.containsKey(name) == false) { + Scheduler s = new RemoteScheduler( + name, + executor, + interpreterProcess, + this, + maxConcurrency); + schedulers.put(name, s); + executor.execute(s); + } + return schedulers.get(name); + } + } + public Scheduler removeScheduler(String name) { synchronized (schedulers) { Scheduler s = schedulers.remove(name); @@ -90,11 +112,13 @@ public class SchedulerFactory implements SchedulerListener { return s; } + @Override public void jobStarted(Scheduler scheduler, Job job) { logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName()); } + @Override public void jobFinished(Scheduler scheduler, Job job) { logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName()); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 53bda30..bbb54b1 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -27,6 +27,7 @@ service RemoteInterpreterService { i32 getProgress(1: string className, 2: RemoteInterpreterContext interpreterContext); string getFormType(1: string className); list completion(1: string className, 2: string buf, 3: i32 cursor); - void shutdown(); + + string getStatus(1:string jobId); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java index 0641aca..37a29d1 100644 --- a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/FIFOSchedulerTest.java @@ -4,7 +4,7 @@ import junit.framework.TestCase; import com.nflabs.zeppelin.scheduler.Job.Status; -public class FIFOSchedulerTest extends TestCase{ +public class FIFOSchedulerTest extends TestCase { private SchedulerFactory schedulerSvc; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java new file mode 100644 index 0000000..8695038 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java @@ -0,0 +1,90 @@ +package com.nflabs.zeppelin.scheduler; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.nflabs.zeppelin.display.GUI; +import com.nflabs.zeppelin.interpreter.InterpreterContext; +import com.nflabs.zeppelin.interpreter.InterpreterGroup; +import com.nflabs.zeppelin.interpreter.remote.RemoteInterpreter; +import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA; + +public class RemoteSchedulerTest { + + private SchedulerFactory schedulerSvc; + + @Before + public void setUp() throws Exception{ + schedulerSvc = new SchedulerFactory(); + } + + @After + public void tearDown(){ + + } + + @Test + public void test() throws Exception { + Properties p = new Properties(); + InterpreterGroup intpGroup = new InterpreterGroup(); + Map env = new HashMap(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", + intpA.getInterpreterProcess(), + 10); + + scheduler.submit(new Job("jobName", null) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("500", new InterpreterContext( + "id", + "title", + "text", + new HashMap(), + new GUI())); + return "500"; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/7d3f55de/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java index d86f860..f38dc65 100644 --- a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java @@ -272,7 +272,7 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor notebook.refreshCron(note.id()); } note.persist(); - + broadcastNote(note); broadcastNoteList(); }