zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [02/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC framework between ZeppelinServer and InterpreterProcess on top of thrift
Date Fri, 15 Jun 2018 01:09:57 GMT
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
new file mode 100644
index 0000000..24df1c6
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.AppendOutputRunner;
+import org.apache.zeppelin.interpreter.remote.InvokeResourceMethodEventMessage;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObject;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.apache.zeppelin.interpreter.thrift.AppOutputAppendEvent;
+import org.apache.zeppelin.interpreter.thrift.AppOutputUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.AppStatusUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.RegisterInfo;
+import org.apache.zeppelin.interpreter.thrift.OutputAppendEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent;
+import org.apache.zeppelin.resource.RemoteResource;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourceId;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class RemoteInterpreterEventServer implements RemoteInterpreterEventService.Iface {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventServer.class);
+
+  private String portRange;
+  private int port;
+  private String host;
+  private TThreadPoolServer thriftServer;
+  private InterpreterSettingManager interpreterSettingManager;
+
+  private final ScheduledExecutorService appendService =
+      Executors.newSingleThreadScheduledExecutor();
+  private ScheduledFuture<?> appendFuture;
+  private AppendOutputRunner runner;
+  private final RemoteInterpreterProcessListener listener;
+  private final ApplicationEventListener appListener;
+  private final Gson gson = new Gson();
+
+  public RemoteInterpreterEventServer(ZeppelinConfiguration zConf,
+                                      InterpreterSettingManager interpreterSettingManager) {
+    this.portRange = zConf.getZeppelinServerRPCPortRange();
+    this.interpreterSettingManager = interpreterSettingManager;
+    this.listener = interpreterSettingManager.getRemoteInterpreterProcessListener();
+    this.appListener = interpreterSettingManager.getAppEventListener();
+  }
+
+  public void run() {
+    TServerSocket tSocket = null;
+    try {
+      tSocket = RemoteInterpreterUtils.createTServerSocket(portRange);
+      port = tSocket.getServerSocket().getLocalPort();
+      host = RemoteInterpreterUtils.findAvailableHostAddress();
+    } catch (IOException e1) {
+      throw new RuntimeException(e1);
+    }
+
+    LOGGER.info("InterpreterEventServer will start. Port: {}", port);
+    RemoteInterpreterEventService.Processor processor =
+        new RemoteInterpreterEventService.Processor(this);
+    this.thriftServer = new TThreadPoolServer(
+        new TThreadPoolServer.Args(tSocket).processor(processor));
+    this.thriftServer.serve();
+  }
+
+  public void start() throws IOException {
+    Thread startingThread = new Thread() {
+      @Override
+      public void run() {
+        TServerSocket tSocket = null;
+        try {
+          tSocket = RemoteInterpreterUtils.createTServerSocket(portRange);
+          port = tSocket.getServerSocket().getLocalPort();
+          host = RemoteInterpreterUtils.findAvailableHostAddress();
+        } catch (IOException e1) {
+          throw new RuntimeException(e1);
+        }
+
+        LOGGER.info("InterpreterEventServer will start. Port: {}", port);
+        RemoteInterpreterEventService.Processor processor =
+            new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this);
+        thriftServer = new TThreadPoolServer(
+            new TThreadPoolServer.Args(tSocket).processor(processor));
+        thriftServer.serve();
+      }
+    };
+    startingThread.start();
+    long start = System.currentTimeMillis();
+    while ((System.currentTimeMillis() - start) < 30 * 1000) {
+      if (thriftServer != null && thriftServer.isServing()) {
+        break;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    if (thriftServer != null && !thriftServer.isServing()) {
+      throw new IOException("Fail to start InterpreterEventServer in 30 seconds.");
+    }
+    LOGGER.info("InterpreterEventServer is started");
+
+    runner = new AppendOutputRunner(listener);
+    appendFuture = appendService.scheduleWithFixedDelay(
+        runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+  }
+
+  public void stop() {
+    if (thriftServer != null) {
+      thriftServer.stop();
+    }
+    if (appendFuture != null) {
+      appendFuture.cancel(true);
+    }
+  }
+
+
+  public int getPort() {
+    return port;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  @Override
+  public void registerInterpreterProcess(RegisterInfo registerInfo) throws TException {
+    InterpreterGroup interpreterGroup =
+        interpreterSettingManager.getInterpreterGroupById(registerInfo.getInterpreterGroupId());
+    if (interpreterGroup == null) {
+      LOGGER.warn("No such interpreterGroup: " + registerInfo.getInterpreterGroupId());
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess =
+        ((ManagedInterpreterGroup) interpreterGroup).getInterpreterProcess();
+    if (interpreterProcess == null) {
+      LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " +
+          registerInfo.getInterpreterGroupId());
+    }
+    ((RemoteInterpreterManagedProcess) interpreterProcess)
+        .processStarted(registerInfo.port, registerInfo.host);
+  }
+
+  @Override
+  public void appendOutput(OutputAppendEvent event) throws TException {
+    if (event.getAppId() == null) {
+      runner.appendBuffer(
+          event.getNoteId(), event.getParagraphId(), event.getIndex(), event.getData());
+    } else {
+      appListener.onOutputAppend(event.getNoteId(), event.getParagraphId(), event.getIndex(),
+          event.getAppId(), event.getData());
+    }
+  }
+
+  @Override
+  public void updateOutput(OutputUpdateEvent event) throws TException {
+    if (event.getAppId() == null) {
+      listener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), event.getIndex(),
+          InterpreterResult.Type.valueOf(event.getType()), event.getData());
+    } else {
+      appListener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), event.getIndex(),
+          event.getAppId(), InterpreterResult.Type.valueOf(event.getType()), event.getData());
+    }
+  }
+
+  @Override
+  public void updateAllOutput(OutputUpdateAllEvent event) throws TException {
+    listener.onOutputClear(event.getNoteId(), event.getParagraphId());
+    for (int i = 0; i < event.getMsg().size(); i++) {
+      RemoteInterpreterResultMessage msg = event.getMsg().get(i);
+      listener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), i,
+          InterpreterResult.Type.valueOf(msg.getType()), msg.getData());
+    }
+  }
+
+  @Override
+  public void appendAppOutput(AppOutputAppendEvent event) throws TException {
+    appListener.onOutputAppend(event.noteId, event.paragraphId, event.index, event.appId,
+        event.data);
+  }
+
+  @Override
+  public void updateAppOutput(AppOutputUpdateEvent event) throws TException {
+    appListener.onOutputUpdated(event.noteId, event.paragraphId, event.index, event.appId,
+        InterpreterResult.Type.valueOf(event.type), event.data);
+  }
+
+  @Override
+  public void updateAppStatus(AppStatusUpdateEvent event) throws TException {
+    appListener.onStatusChange(event.noteId, event.paragraphId, event.appId, event.status);
+  }
+
+
+  @Override
+  public void runParagraphs(RunParagraphsEvent event) {
+    try {
+      listener.runParagraphs(event.getNoteId(), event.getParagraphIndices(),
+          event.getParagraphIds(), event.getCurParagraphId());
+      if (InterpreterContext.get() != null) {
+        LOGGER.info("complete runParagraphs." + InterpreterContext.get().getParagraphId() + " "
+          + event);
+      } else {
+        LOGGER.info("complete runParagraphs." + event);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void addAngularObject(String intpGroupId, String json) throws TException {
+    AngularObject angularObject = AngularObject.fromJson(json);
+    InterpreterGroup interpreterGroup =
+        interpreterSettingManager.getInterpreterGroupById(intpGroupId);
+    if (interpreterGroup == null) {
+      throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
+    }
+    interpreterGroup.getAngularObjectRegistry().add(angularObject.getName(),
+        angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
+  }
+
+  @Override
+  public void updateAngularObject(String intpGroupId, String json) throws TException {
+    AngularObject angularObject = AngularObject.fromJson(json);
+    InterpreterGroup interpreterGroup =
+        interpreterSettingManager.getInterpreterGroupById(intpGroupId);
+    if (interpreterGroup == null) {
+      throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
+    }
+    AngularObject localAngularObject = interpreterGroup.getAngularObjectRegistry().get(
+        angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
+    if (localAngularObject instanceof RemoteAngularObject) {
+      // to avoid ping-pong loop
+      ((RemoteAngularObject) localAngularObject).set(
+          angularObject.get(), true, false);
+    } else {
+      localAngularObject.set(angularObject.get());
+    }
+  }
+
+  @Override
+  public void removeAngularObject(String intpGroupId,
+                                  String noteId,
+                                  String paragraphId,
+                                  String name) throws TException {
+    InterpreterGroup interpreterGroup =
+        interpreterSettingManager.getInterpreterGroupById(intpGroupId);
+    if (interpreterGroup == null) {
+      throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
+    }
+    interpreterGroup.getAngularObjectRegistry().remove(name, noteId, paragraphId);
+  }
+
+  @Override
+  public void sendMetaInfo(String intpGroupId, String json) throws TException {
+    InterpreterGroup interpreterGroup =
+        interpreterSettingManager.getInterpreterGroupById(intpGroupId);
+    if (interpreterGroup == null) {
+      throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
+    }
+
+    Map<String, String> metaInfos = gson.fromJson(json,
+        new TypeToken<Map<String, String>>() {
+        }.getType());
+    String settingId = RemoteInterpreterUtils.
+        getInterpreterSettingId(interpreterGroup.getId());
+    listener.onMetaInfosReceived(settingId, metaInfos);
+  }
+
+  @Override
+  public void sendParagraphInfo(String intpGroupId, String json) throws TException {
+    InterpreterGroup interpreterGroup =
+        interpreterSettingManager.getInterpreterGroupById(intpGroupId);
+    if (interpreterGroup == null) {
+      throw new TException("Invalid InterpreterGroupId: " + intpGroupId);
+    }
+
+    Map<String, String> paraInfos = gson.fromJson(json,
+        new TypeToken<Map<String, String>>() {
+        }.getType());
+    String noteId = paraInfos.get("noteId");
+    String paraId = paraInfos.get("paraId");
+    String settingId = RemoteInterpreterUtils.
+        getInterpreterSettingId(interpreterGroup.getId());
+    if (noteId != null && paraId != null && settingId != null) {
+      listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
+    }
+  }
+
+  @Override
+  public List<String> getAllResources(String intpGroupId) throws TException {
+    ResourceSet resourceSet = getAllResourcePoolExcept(intpGroupId);
+    List<String> resourceList = new LinkedList<>();
+    for (Resource r : resourceSet) {
+      resourceList.add(r.toJson());
+    }
+    return resourceList;
+  }
+
+  @Override
+  public ByteBuffer getResource(String resourceIdJson) throws TException {
+    ResourceId resourceId = ResourceId.fromJson(resourceIdJson);
+    Object o = getResource(resourceId);
+    ByteBuffer obj;
+    if (o == null) {
+      obj = ByteBuffer.allocate(0);
+    } else {
+      try {
+        obj = Resource.serializeObject(o);
+      } catch (IOException e) {
+        throw new TException(e);
+      }
+    }
+    return obj;
+  }
+
+  @Override
+  public ByteBuffer invokeMethod(String intpGroupId, String invokeMethodJson) throws TException {
+    InvokeResourceMethodEventMessage invokeMethodMessage =
+        InvokeResourceMethodEventMessage.fromJson(invokeMethodJson);
+    Object ret = invokeResourceMethod(intpGroupId, invokeMethodMessage);
+    ByteBuffer obj = null;
+    if (ret == null) {
+      obj = ByteBuffer.allocate(0);
+    } else {
+      try {
+        obj = Resource.serializeObject(ret);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    return obj;
+  }
+
+  private Object invokeResourceMethod(String intpGroupId,
+                                      final InvokeResourceMethodEventMessage message) {
+    final ResourceId resourceId = message.resourceId;
+    ManagedInterpreterGroup intpGroup =
+        interpreterSettingManager.getInterpreterGroupById(resourceId.getResourcePoolId());
+    if (intpGroup == null) {
+      return null;
+    }
+
+    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null) {
+      ResourcePool localPool = intpGroup.getResourcePool();
+      if (localPool != null) {
+        Resource res = localPool.get(resourceId.getName());
+        if (res != null) {
+          try {
+            return res.invokeMethod(
+                message.methodName,
+                message.getParamTypes(),
+                message.params,
+                message.returnResourceName);
+          } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+            return null;
+          }
+        } else {
+          // object is null. can't invoke any method
+          LOGGER.error("Can't invoke method {} on null object", message.methodName);
+          return null;
+        }
+      } else {
+        LOGGER.error("no resource pool");
+        return null;
+      }
+    } else if (interpreterSettingManager.getInterpreterGroupById(intpGroupId)
+        .getInterpreterProcess().isRunning()) {
+      ByteBuffer res = interpreterSettingManager.getInterpreterGroupById(intpGroupId)
+          .getInterpreterProcess().callRemoteFunction(
+          new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+            @Override
+            public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception {
+              return client.resourceInvokeMethod(
+                  resourceId.getNoteId(),
+                  resourceId.getParagraphId(),
+                  resourceId.getName(),
+                  message.toJson());
+            }
+          }
+      );
+
+      try {
+        return Resource.deserializeObject(res);
+      } catch (Exception e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+      return null;
+    }
+    return null;
+  }
+
+  private Object getResource(final ResourceId resourceId) {
+    ManagedInterpreterGroup intpGroup = interpreterSettingManager
+        .getInterpreterGroupById(resourceId.getResourcePoolId());
+    if (intpGroup == null) {
+      return null;
+    }
+    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+          @Override
+          public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception {
+            return  client.resourceGet(
+                resourceId.getNoteId(),
+                resourceId.getParagraphId(),
+                resourceId.getName());
+          }
+        }
+    );
+
+    try {
+      Object o = Resource.deserializeObject(buffer);
+      return o;
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+    return null;
+  }
+
+  private ResourceSet getAllResourcePoolExcept(String interpreterGroupId) {
+    ResourceSet resourceSet = new ResourceSet();
+    for (ManagedInterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) {
+      if (intpGroup.getId().equals(interpreterGroupId)) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+                return client.resourcePoolGetAll();
+              }
+            }
+        );
+        for (String res : resourceList) {
+          resourceSet.add(RemoteResource.fromJson(res));
+        }
+      }
+    }
+    return resourceSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
index e107fb7..b271d9b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
@@ -82,9 +82,10 @@ public class ShellScriptLauncher extends InterpreterLauncher {
           + context.getInterpreterSettingId();
       return new RemoteInterpreterManagedProcess(
           runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
-          zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
+          context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(),
           zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
-          buildEnvFromProperties(context), connectTimeout, name, option.isUserImpersonate());
+          buildEnvFromProperties(context), connectTimeout, name,
+          context.getInterpreterGroupId(), option.isUserImpersonate());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
index 4fc860d..4dffff1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
@@ -24,7 +24,6 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.apache.zeppelin.notebook.FileSystemStorage;
@@ -114,11 +113,6 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
               interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
           // interpreterSettingManager may be null when this class is used when it is used
           // stop-interpreter.sh
-          if (interpreterSettingManager != null) {
-            client.setRemoteInterpreterEventPoller(new RemoteInterpreterEventPoller(
-                interpreterSettingManager.getRemoteInterpreterProcessListener(),
-                interpreterSettingManager.getAppEventListener()));
-          }
           clients.put(groupId, client);
           LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]);
         }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 924901b..7458ce5 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
+import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 34ed804..efbe648 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -29,7 +29,6 @@ import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.ConfInterpreter;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.LifecycleManager;
@@ -214,16 +213,6 @@ public class RemoteInterpreter extends Interpreter {
     } catch (IOException e) {
       throw new InterpreterException(e);
     }
-    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
-        .getInterpreterContextRunnerPool();
-    List<InterpreterContextRunner> runners = context.getRunners();
-    if (runners != null && runners.size() != 0) {
-      // assume all runners in this InterpreterContext have the same note id
-      String noteId = runners.get(0).getNoteId();
-
-      interpreterContextRunnerPool.clear(noteId);
-      interpreterContextRunnerPool.addAll(noteId, runners);
-    }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
     return interpreterProcess.callRemoteFunction(
         new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
@@ -395,7 +384,8 @@ public class RemoteInterpreter extends Interpreter {
     // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs
     // running under the scheduler of this session will be aborted.
     Scheduler s = new RemoteScheduler(
-        RemoteInterpreter.class.getName() + "-" + getInterpreterGroup().getId() + "-" + sessionId,
+        RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-"
+            + sessionId,
         SchedulerFactory.singleton().getExecutor(),
         sessionId,
         this,
@@ -407,8 +397,7 @@ public class RemoteInterpreter extends Interpreter {
   private RemoteInterpreterContext convert(InterpreterContext ic) {
     return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
         ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
-        gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getNoteGui()),
-        gson.toJson(ic.getRunners()));
+        gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getNoteGui()));
   }
 
   private InterpreterResult convert(RemoteInterpreterResult result) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
deleted file mode 100644
index abda81e..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
-import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourceId;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.ResourceSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Processes message from RemoteInterpreter process
- */
-public class RemoteInterpreterEventPoller extends Thread {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
-  private final ScheduledExecutorService appendService =
-      Executors.newSingleThreadScheduledExecutor();
-  private final RemoteInterpreterProcessListener listener;
-  private final ApplicationEventListener appListener;
-
-  private volatile boolean shutdown;
-
-  private RemoteInterpreterProcess interpreterProcess;
-  private ManagedInterpreterGroup interpreterGroup;
-
-  Gson gson = new Gson();
-
-  public RemoteInterpreterEventPoller(
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener) {
-    this.listener = listener;
-    this.appListener = appListener;
-    shutdown = false;
-  }
-
-  public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) {
-    this.interpreterProcess = interpreterProcess;
-  }
-
-  public void setInterpreterGroup(ManagedInterpreterGroup interpreterGroup) {
-    this.interpreterGroup = interpreterGroup;
-  }
-
-  @Override
-  public void run() {
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
-        runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
-
-    while (!shutdown) {
-      // wait and retry
-      if (!interpreterProcess.isRunning()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          // nothing to do
-        }
-        continue;
-      }
-
-      RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
-          new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
-            @Override
-            public RemoteInterpreterEvent call(Client client) throws Exception {
-              return client.getEvent();
-            }
-          }
-      );
-
-      AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
-
-      try {
-        if (event.getType() != RemoteInterpreterEventType.NO_OP) {
-          logger.debug("Receive message from RemoteInterpreter Process: " + event.toString());
-        }
-        if (event.getType() == RemoteInterpreterEventType.NO_OP) {
-          continue;
-        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
-          AngularObject angularObject = AngularObject.fromJson(event.getData());
-          angularObjectRegistry.add(angularObject.getName(),
-              angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
-        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
-          AngularObject angularObject = AngularObject.fromJson(event.getData());
-          AngularObject localAngularObject = angularObjectRegistry.get(
-              angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
-          if (localAngularObject instanceof RemoteAngularObject) {
-            // to avoid ping-pong loop
-            ((RemoteAngularObject) localAngularObject).set(
-                angularObject.get(), true, false);
-          } else {
-            localAngularObject.set(angularObject.get());
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
-          AngularObject angularObject = AngularObject.fromJson(event.getData());
-          angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(),
-                  angularObject.getParagraphId());
-        } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
-          InterpreterContextRunner runnerFromRemote = gson.fromJson(
-              event.getData(), RemoteInterpreterContextRunner.class);
-
-          listener.onRemoteRunParagraph(
-              runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
-
-        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
-          ResourceSet resourceSet = getAllResourcePoolExcept();
-          sendResourcePoolResponseGetAll(resourceSet);
-        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
-          String resourceIdString = event.getData();
-          ResourceId resourceId = ResourceId.fromJson(resourceIdString);
-          logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
-          Object o = getResource(resourceId);
-          sendResourceResponseGet(resourceId, o);
-        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) {
-          String message = event.getData();
-          InvokeResourceMethodEventMessage invokeMethodMessage =
-              InvokeResourceMethodEventMessage.fromJson(message);
-          Object ret = invokeResourceMethod(invokeMethodMessage);
-          sendInvokeMethodResult(invokeMethodMessage, ret);
-        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
-          // on output append
-          Map<String, String> outputAppend = gson.fromJson(
-                  event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
-          String noteId = (String) outputAppend.get("noteId");
-          String paragraphId = (String) outputAppend.get("paragraphId");
-          int index = Integer.parseInt(outputAppend.get("index"));
-          String outputToAppend = (String) outputAppend.get("data");
-
-          String appId = (String) outputAppend.get("appId");
-
-          if (appId == null) {
-            runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
-          } else {
-            appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
-          Map<String, Object> outputUpdate = gson.fromJson(
-              event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
-          String noteId = (String) outputUpdate.get("noteId");
-          String paragraphId = (String) outputUpdate.get("paragraphId");
-
-          // clear the output
-          List<Map<String, String>> messages =
-              (List<Map<String, String>>) outputUpdate.get("messages");
-
-          if (messages != null) {
-            listener.onOutputClear(noteId, paragraphId);
-            for (int i = 0; i < messages.size(); i++) {
-              Map<String, String> m = messages.get(i);
-              InterpreterResult.Type type =
-                  InterpreterResult.Type.valueOf((String) m.get("type"));
-              String outputToUpdate = (String) m.get("data");
-
-              listener.onOutputUpdated(noteId, paragraphId, i, type, outputToUpdate);
-            }
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
-          // on output update
-          Map<String, String> outputAppend = gson.fromJson(
-              event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
-          String noteId = (String) outputAppend.get("noteId");
-          String paragraphId = (String) outputAppend.get("paragraphId");
-          int index = Integer.parseInt(outputAppend.get("index"));
-          InterpreterResult.Type type =
-              InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
-          String outputToUpdate = (String) outputAppend.get("data");
-          String appId = (String) outputAppend.get("appId");
-
-          if (appId == null) {
-            listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
-          } else {
-            appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
-          // on output update
-          Map<String, String> appStatusUpdate = gson.fromJson(
-              event.getData(), new TypeToken<Map<String, String>>() {}.getType());
-
-          String noteId = appStatusUpdate.get("noteId");
-          String paragraphId = appStatusUpdate.get("paragraphId");
-          String appId = appStatusUpdate.get("appId");
-          String status = appStatusUpdate.get("status");
-
-          appListener.onStatusChange(noteId, paragraphId, appId, status);
-        } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
-          RemoteZeppelinServerResource reqResourceBody = RemoteZeppelinServerResource.fromJson(
-              event.getData());
-          progressRemoteZeppelinControlEvent(
-              reqResourceBody.getResourceType(), listener, reqResourceBody);
-
-        } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
-          Map<String, String> metaInfos = gson.fromJson(event.getData(),
-              new TypeToken<Map<String, String>>() {
-              }.getType());
-          String settingId = RemoteInterpreterUtils.
-              getInterpreterSettingId(interpreterGroup.getId());
-          listener.onMetaInfosReceived(settingId, metaInfos);
-        } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) {
-          Map<String, String> paraInfos = gson.fromJson(event.getData(),
-              new TypeToken<Map<String, String>>() {
-              }.getType());
-          String noteId = paraInfos.get("noteId");
-          String paraId = paraInfos.get("paraId");
-          String settingId = RemoteInterpreterUtils.
-              getInterpreterSettingId(interpreterGroup.getId());
-          if (noteId != null && paraId != null && settingId != null) {
-            listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
-          }
-        }
-      } catch (Exception e) {
-        logger.error("Can't handle event " + event, e);
-      }
-    }
-    try {
-      clearUnreadEvents(interpreterProcess.getClient());
-    } catch (Exception e1) {
-      if (shutdown) {
-        logger.error("Can not get RemoteInterpreterEvent because it is shutdown.");
-      } else {
-        logger.error("Can't get RemoteInterpreterEvent", e1);
-      }
-    }
-    if (appendFuture != null) {
-      appendFuture.cancel(true);
-    }
-  }
-
-  private void clearUnreadEvents(Client client) throws TException {
-    while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
-  }
-
-  private void progressRemoteZeppelinControlEvent(
-      RemoteZeppelinServerResource.Type resourceType,
-      RemoteInterpreterProcessListener remoteWorksEventListener,
-      RemoteZeppelinServerResource reqResourceBody) throws Exception {
-    boolean broken = false;
-    final Gson gson = new Gson();
-    final String eventOwnerKey = reqResourceBody.getOwnerKey();
-    try {
-      if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
-        final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
-
-        ZeppelinServerResourceParagraphRunner reqRunnerContext =
-            new ZeppelinServerResourceParagraphRunner();
-
-        Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
-        String noteId = (String) reqResourceMap.get("noteId");
-        String paragraphId = (String) reqResourceMap.get("paragraphId");
-
-        reqRunnerContext.setNoteId(noteId);
-        reqRunnerContext.setParagraphId(paragraphId);
-
-        RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
-            new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
-
-              @Override
-              public void onFinished(Object resultObject) {
-                if (resultObject != null && resultObject instanceof List) {
-                  List<InterpreterContextRunner> runnerList =
-                      (List<InterpreterContextRunner>) resultObject;
-                  for (InterpreterContextRunner r : runnerList) {
-                    remoteRunners.add(
-                        new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
-                    );
-                  }
-
-                  final RemoteZeppelinServerResource resResource =
-                      new RemoteZeppelinServerResource();
-                  resResource.setOwnerKey(eventOwnerKey);
-                  resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
-                  resResource.setData(remoteRunners);
-
-                  interpreterProcess.callRemoteFunction(
-                      new RemoteInterpreterProcess.RemoteFunction<Void>() {
-                        @Override
-                        public Void call(Client client) throws Exception {
-                          client.onReceivedZeppelinResource(resResource.toJson());
-                          return null;
-                        }
-                      }
-                  );
-                }
-              }
-
-              @Override
-              public void onError() {
-                logger.info("onGetParagraphRunners onError");
-              }
-            };
-
-        remoteWorksEventListener.onGetParagraphRunners(
-            reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
-      }
-    } catch (Exception e) {
-      logger.error("Can't get RemoteInterpreterEvent", e);
-      waitQuietly();
-
-    }
-  }
-
-  private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
-    interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            List<String> resourceList = new LinkedList<>();
-            for (Resource r : resourceSet) {
-              resourceList.add(r.toJson());
-            }
-            client.resourcePoolResponseGetAll(resourceList);
-            return null;
-          }
-        }
-    );
-  }
-
-  private ResourceSet getAllResourcePoolExcept() {
-    ResourceSet resourceSet = new ResourceSet();
-    for (ManagedInterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
-        .getInterpreterSettingManager().getAllInterpreterGroup()) {
-      if (intpGroup.getId().equals(interpreterGroup.getId())) {
-        continue;
-      }
-
-      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-      } else if (interpreterProcess.isRunning()) {
-        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(Client client) throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            }
-        );
-        for (String res : resourceList) {
-          resourceSet.add(Resource.fromJson(res));
-        }
-      }
-    }
-    return resourceSet;
-  }
-
-  private void sendResourceResponseGet(final ResourceId resourceId, final Object o) {
-    interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            String rid = resourceId.toJson();
-            ByteBuffer obj;
-            if (o == null) {
-              obj = ByteBuffer.allocate(0);
-            } else {
-              obj = Resource.serializeObject(o);
-            }
-            client.resourceResponseGet(rid, obj);
-            return null;
-          }
-        }
-    );
-  }
-
-  private Object getResource(final ResourceId resourceId) {
-    ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
-        .getInterpreterSettingManager()
-        .getInterpreterGroupById(resourceId.getResourcePoolId());
-    if (intpGroup == null) {
-      return null;
-    }
-    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
-          @Override
-          public ByteBuffer call(Client client) throws Exception {
-            return  client.resourceGet(
-                resourceId.getNoteId(),
-                resourceId.getParagraphId(),
-                resourceId.getName());
-          }
-        }
-    );
-
-    try {
-      Object o = Resource.deserializeObject(buffer);
-      return o;
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-    }
-    return null;
-  }
-
-  public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message,
-                                     final Object o) {
-    interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            String invokeMessage = message.toJson();
-            ByteBuffer obj;
-            if (o == null) {
-              obj = ByteBuffer.allocate(0);
-            } else {
-              obj = Resource.serializeObject(o);
-            }
-            client.resourceResponseInvokeMethod(invokeMessage, obj);
-            return null;
-          }
-        }
-    );
-  }
-
-  private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) {
-    final ResourceId resourceId = message.resourceId;
-    ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
-        .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
-    if (intpGroup == null) {
-      return null;
-    }
-
-    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null) {
-      ResourcePool localPool = intpGroup.getResourcePool();
-      if (localPool != null) {
-        Resource res = localPool.get(resourceId.getName());
-        if (res != null) {
-          try {
-            return res.invokeMethod(
-                message.methodName,
-                message.getParamTypes(),
-                message.params,
-                message.returnResourceName);
-          } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            return null;
-          }
-        } else {
-          // object is null. can't invoke any method
-          logger.error("Can't invoke method {} on null object", message.methodName);
-          return null;
-        }
-      } else {
-        logger.error("no resource pool");
-        return null;
-      }
-    } else if (interpreterProcess.isRunning()) {
-      ByteBuffer res = interpreterProcess.callRemoteFunction(
-          new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
-            @Override
-            public ByteBuffer call(Client client) throws Exception {
-              return client.resourceInvokeMethod(
-                  resourceId.getNoteId(),
-                  resourceId.getParagraphId(),
-                  resourceId.getName(),
-                  message.toJson());
-            }
-          }
-      );
-
-      try {
-        return Resource.deserializeObject(res);
-      } catch (Exception e) {
-        logger.error(e.getMessage(), e);
-      }
-      return null;
-    }
-    return null;
-  }
-
-  private void waitQuietly() {
-    try {
-      synchronized (this) {
-        wait(1000);
-      }
-    } catch (InterruptedException ignored) {
-      logger.info("Error in RemoteInterpreterEventPoller while waitQuietly : ", ignored);
-    }
-  }
-
-  public void shutdown() {
-    shutdown = true;
-    synchronized (this) {
-      notify();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index b186e48..5160522 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -26,13 +26,6 @@ import org.apache.commons.exec.ExecuteWatchdog;
 import org.apache.commons.exec.LogOutputStream;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,39 +45,44 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
       RemoteInterpreterManagedProcess.class);
 
   private final String interpreterRunner;
-  private final String callbackPortRange;
+  private final int zeppelinServerRPCPort;
+  private final String zeppelinServerRPCHost;
   private final String interpreterPortRange;
   private DefaultExecutor executor;
   private ExecuteWatchdog watchdog;
   private AtomicBoolean running = new AtomicBoolean(false);
-  private TServer callbackServer;
   private String host = null;
   private int port = -1;
   private final String interpreterDir;
   private final String localRepoDir;
   private final String interpreterSettingName;
+  private final String interpreterGroupId;
   private final boolean isUserImpersonated;
 
   private Map<String, String> env;
 
   public RemoteInterpreterManagedProcess(
       String intpRunner,
-      String callbackPortRange,
+      int zeppelinServerRPCPort,
+      String zeppelinServerRPCHost,
       String interpreterPortRange,
       String intpDir,
       String localRepoDir,
       Map<String, String> env,
       int connectTimeout,
       String interpreterSettingName,
+      String interpreterGroupId,
       boolean isUserImpersonated) {
     super(connectTimeout);
     this.interpreterRunner = intpRunner;
-    this.callbackPortRange = callbackPortRange;
+    this.zeppelinServerRPCPort = zeppelinServerRPCPort;
+    this.zeppelinServerRPCHost = zeppelinServerRPCHost;
     this.interpreterPortRange = interpreterPortRange;
     this.env = env;
     this.interpreterDir = intpDir;
     this.localRepoDir = localRepoDir;
     this.interpreterSettingName = interpreterSettingName;
+    this.interpreterGroupId = interpreterGroupId;
     this.isUserImpersonated = isUserImpersonated;
   }
 
@@ -101,69 +99,17 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
   @Override
   public void start(String userName) {
     // start server process
-    final String callbackHost;
-    final int callbackPort;
-    TServerSocket tSocket = null;
-    try {
-      tSocket = RemoteInterpreterUtils.createTServerSocket(callbackPortRange);
-      callbackPort = tSocket.getServerSocket().getLocalPort();
-      callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
-    } catch (IOException e1) {
-      throw new RuntimeException(e1);
-    }
-
-    logger.info("Thrift server for callback will start. Port: {}", callbackPort);
-    try {
-      callbackServer = new TThreadPoolServer(
-        new TThreadPoolServer.Args(tSocket).processor(
-          new RemoteInterpreterCallbackService.Processor<>(
-            new RemoteInterpreterCallbackService.Iface() {
-              @Override
-              public void callback(CallbackInfo callbackInfo) throws TException {
-                logger.info("RemoteInterpreterServer Registered: {}", callbackInfo);
-                host = callbackInfo.getHost();
-                port = callbackInfo.getPort();
-                running.set(true);
-                synchronized (running) {
-                  running.notify();
-                }
-              }
-            })));
-      // Start thrift server to receive callbackInfo from RemoteInterpreterServer;
-      new Thread(new Runnable() {
-        @Override
-        public void run() {
-          callbackServer.serve();
-        }
-      }).start();
-
-      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-        @Override
-        public void run() {
-          if (callbackServer.isServing()) {
-            callbackServer.stop();
-          }
-        }
-      }));
-
-      while (!callbackServer.isServing()) {
-        logger.debug("callbackServer is not serving");
-        Thread.sleep(500);
-      }
-      logger.debug("callbackServer is serving now");
-    } catch (InterruptedException e) {
-      logger.warn("", e);
-    }
-
     CommandLine cmdLine = CommandLine.parse(interpreterRunner);
     cmdLine.addArgument("-d", false);
     cmdLine.addArgument(interpreterDir, false);
     cmdLine.addArgument("-c", false);
-    cmdLine.addArgument(callbackHost, false);
+    cmdLine.addArgument(zeppelinServerRPCHost, false);
     cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(Integer.toString(callbackPort), false);
+    cmdLine.addArgument(String.valueOf(zeppelinServerRPCPort), false);
     cmdLine.addArgument("-r", false);
     cmdLine.addArgument(interpreterPortRange, false);
+    cmdLine.addArgument("-i", false);
+    cmdLine.addArgument(interpreterGroupId, false);
     if (isUserImpersonated && !userName.equals("anonymous")) {
       cmdLine.addArgument("-u", false);
       cmdLine.addArgument(userName, false);
@@ -201,7 +147,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
         }
       }
       if (!running.get()) {
-        callbackServer.stop();
         throw new RuntimeException(new String(cmdOut.toByteArray()));
       }
     } catch (InterruptedException e) {
@@ -211,11 +156,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
   }
 
   public void stop() {
-    // shutdown EventPoller first.
-    this.getRemoteInterpreterEventPoller().shutdown();
-    if (callbackServer.isServing()) {
-      callbackServer.stop();
-    }
     if (isRunning()) {
       logger.info("Kill interpreter process");
       try {
@@ -245,6 +185,16 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
 
   }
 
+  // called by RemoteInterpreterServer to notify that RemoteInterpreter Process is started
+  public void processStarted(int port, String host) {
+    this.port = port;
+    this.host = host;
+    synchronized (running) {
+      running.set(true);
+      running.notify();
+    }
+  }
+
   @Override
   public void onProcessFailed(ExecuteException e) {
     logger.info("Interpreter process failed {}", e);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 08653ae..787e882 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -19,8 +19,6 @@ package org.apache.zeppelin.interpreter.remote;
 import com.google.gson.Gson;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.thrift.TException;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
@@ -33,7 +31,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
 
   private GenericObjectPool<Client> clientPool;
-  private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
   private final InterpreterContextRunnerPool interpreterContextRunnerPool;
   private int connectTimeout;
 
@@ -43,14 +40,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
     this.connectTimeout = connectTimeout;
   }
 
-  public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() {
-    return remoteInterpreterEventPoller;
-  }
-
-  public void setRemoteInterpreterEventPoller(RemoteInterpreterEventPoller eventPoller) {
-    this.remoteInterpreterEventPoller = eventPoller;
-  }
-
   public int getConnectTimeout() {
     return connectTimeout;
   }
@@ -122,10 +111,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
     }
   }
 
-  public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
-    return interpreterContextRunnerPool;
-  }
-
   public <T> T callRemoteFunction(RemoteFunction<T> func) {
     Client client = null;
     boolean broken = false;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
index 8b23bf2..ec6b1ec 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -18,6 +18,8 @@ package org.apache.zeppelin.interpreter.remote;
 
 import org.apache.zeppelin.interpreter.InterpreterResult;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -29,17 +31,10 @@ public interface RemoteInterpreterProcessListener {
       String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
   public void onOutputClear(String noteId, String paragraphId);
   public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
-  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
-  public void onGetParagraphRunners(
-      String noteId, String paragraphId, RemoteWorksEventListener callback);
+  void runParagraphs(String noteId, List<Integer> paragraphIndices, List<String> paragraphIds,
+                     String curParagraphId)
+      throws IOException;
 
-  /**
-   * Remote works for Interpreter callback listener
-   */
-  public interface RemoteWorksEventListener {
-    public void onFinished(Object resultObject);
-    public void onError();
-  }
   public void onParaInfosReceived(String noteId, String paragraphId,
                                   String interpreterSettingId, Map<String, String> metaInfos);
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 3dc2b99..b8d3de1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -584,6 +584,10 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     return null;
   }
 
+  public Paragraph getParagraph(int index) {
+    return paragraphs.get(index);
+  }
+
   public Paragraph getLastParagraph() {
     synchronized (paragraphs) {
       return paragraphs.get(paragraphs.size() - 1);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 57756b8..c76b3da 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -562,9 +562,21 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     }
 
     InterpreterContext interpreterContext =
-        new InterpreterContext(note.getId(), getId(), intpText, this.getTitle(),
-            this.getText(), this.getAuthenticationInfo(), this.getConfig(), this.settings,
-            getNoteGui(), registry, resourcePool, runners, output);
+        InterpreterContext.builder()
+            .setNoteId(note.getId())
+            .setParagraphId(getId())
+            .setReplName(intpText)
+            .setParagraphTitle(title)
+            .setParagraphText(text)
+            .setAuthenticationInfo(authenticationInfo)
+            .setConfig(config)
+            .setGUI(settings)
+            .setNoteGUI(getNoteGui())
+            .setAngularObjectRegistry(registry)
+            .setResourcePool(resourcePool)
+            .setInterpreterOut(output)
+            .build();
+
     return interpreterContext;
   }
 
@@ -592,9 +604,20 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     }
 
     InterpreterContext interpreterContext =
-        new InterpreterContext(note.getId(), getId(), intpText, this.getTitle(),
-            this.getText(), this.getAuthenticationInfo(), this.getConfig(), this.settings,
-            getNoteGui(), registry, resourcePool, runners, output);
+        InterpreterContext.builder()
+            .setNoteId(note.getId())
+            .setParagraphId(getId())
+            .setReplName(intpText)
+            .setParagraphTitle(title)
+            .setParagraphText(text)
+            .setAuthenticationInfo(authenticationInfo)
+            .setConfig(config)
+            .setGUI(settings)
+            .setNoteGUI(getNoteGui())
+            .setAngularObjectRegistry(registry)
+            .setResourcePool(resourcePool)
+            .setInterpreterOut(output)
+            .build();
     return interpreterContext;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
index 1a6a7b1..1628596 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
@@ -73,4 +73,11 @@ public abstract class AbstractInterpreterTest {
   protected Note createNote() {
     return new Note(null, interpreterFactory, interpreterSettingManager, null, null, null, null);
   }
+
+  protected InterpreterContext createDummyInterpreterContext() {
+    return InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
index 4d74c7c..892b43d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/ConfInterpreterTest.java
@@ -17,18 +17,12 @@
 
 package org.apache.zeppelin.interpreter;
 
-import com.sun.net.httpserver.Authenticator;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class ConfInterpreterTest extends AbstractInterpreterTest {
@@ -39,9 +33,11 @@ public class ConfInterpreterTest extends AbstractInterpreterTest {
     assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter);
     ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf");
 
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), new GUI(),
-        null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
+
     InterpreterResult result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code);
 
@@ -68,9 +64,10 @@ public class ConfInterpreterTest extends AbstractInterpreterTest {
     assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter);
     ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf");
 
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), new GUI(),
-        null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
     InterpreterResult result = confInterpreter.interpret("", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code);
 
@@ -88,9 +85,11 @@ public class ConfInterpreterTest extends AbstractInterpreterTest {
     assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter);
     ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf");
 
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), new GUI(),
-        null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
+
     RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
     InterpreterResult result = remoteInterpreter.interpret("hello world", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
index 1d3e2ff..f49ee94 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
@@ -51,7 +51,7 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
     assertEquals("test", interpreterSetting.getName());
     assertEquals("test", interpreterSetting.getGroup());
     assertTrue(interpreterSetting.getLifecycleManager() instanceof NullLifecycleManager);
-    assertEquals(3, interpreterSetting.getInterpreterInfos().size());
+    assertEquals(8, interpreterSetting.getInterpreterInfos().size());
     // 3 other builtin properties:
     //   * zeppelin.interpreter.output.limit
     //   * zeppelin.interpreter.localRepo
@@ -86,7 +86,7 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
     interpreterSetting = interpreterSettingManager2.getByName("test");
     assertEquals("test", interpreterSetting.getName());
     assertEquals("test", interpreterSetting.getGroup());
-    assertEquals(3, interpreterSetting.getInterpreterInfos().size());
+    assertEquals(8, interpreterSetting.getInterpreterInfos().size());
     assertEquals(6, interpreterSetting.getJavaProperties().size());
     assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1"));
     assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2"));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
index 99087a5..6301d0d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
@@ -46,7 +46,7 @@ public class ShellScriptLauncherTest {
     properties.setProperty("property_1", "value_1");
     InterpreterOption option = new InterpreterOption();
     option.setUserImpersonate(true);
-    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name");
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 82f9e2a..2e4b165 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -49,7 +49,7 @@ public class SparkInterpreterLauncherTest {
     properties.setProperty("spark.jars", "jar_1");
 
     InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@@ -74,7 +74,7 @@ public class SparkInterpreterLauncherTest {
     properties.setProperty("spark.jars", "jar_1");
 
     InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@@ -100,7 +100,7 @@ public class SparkInterpreterLauncherTest {
     properties.setProperty("spark.jars", "jar_1");
 
     InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@@ -125,7 +125,7 @@ public class SparkInterpreterLauncherTest {
     properties.setProperty("spark.jars", "jar_1");
 
     InterpreterOption option = new InterpreterOption();
-    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@@ -153,7 +153,7 @@ public class SparkInterpreterLauncherTest {
 
     InterpreterOption option = new InterpreterOption();
     option.setUserImpersonate(true);
-    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
+    InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
index 1041502..21d4b0d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java
@@ -18,20 +18,15 @@
 package org.apache.zeppelin.interpreter.lifecycle;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -61,9 +56,10 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
     // InterpreterGroup is not removed after 15 seconds, as TimeoutLifecycleManager only manage it after it is started
     assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
 
-    InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-        new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
     remoteInterpreter.interpret("hello world", context);
     assertTrue(remoteInterpreter.isOpened());
 
@@ -98,9 +94,10 @@ public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest {
 
       @Override
       protected Object jobRun() throws Throwable {
-        InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
-            "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-            new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
+        InterpreterContext context = InterpreterContext.builder()
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .build();
         return remoteInterpreter.interpret("100000", context);
       }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
index cf1899c..2eaed74 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorageTest.java
@@ -3,23 +3,18 @@ package org.apache.zeppelin.interpreter.recovery;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 
@@ -49,9 +44,10 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
 
     Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
     RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
-    InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-        new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context1 = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
     remoteInterpreter1.interpret("hello", context1);
 
     assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
@@ -67,17 +63,19 @@ public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
 
     Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
     RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
-    InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-        new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context1 = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
     remoteInterpreter1.interpret("hello", context1);
     assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
 
     Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note2");
     RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
-    InterpreterContext context2 = new InterpreterContext("noteId", "paragraphId", "repl",
-        "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
-        new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
+    InterpreterContext context2 = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .build();
     remoteInterpreter2.interpret("hello", context2);
 
     assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
index c9dc5c0..7bbd2b8 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -139,7 +139,6 @@ public class AppendOutputRunnerTest {
     TestAppender appender = new TestAppender();
     Logger logger = Logger.getRootLogger();
     logger.addAppender(appender);
-    Logger.getLogger(RemoteInterpreterEventPoller.class);
 
     runner.run();
     List<LoggingEvent> log;


Mime
View raw message