zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [3/3] incubator-zeppelin git commit: [ZEPPELIN-619] Shared Resource pool across interpreter processes
Date Mon, 01 Feb 2016 04:12:10 GMT
[ZEPPELIN-619] Shared Resource pool across interpreter processes

### What is this PR for?
This is sub task of https://issues.apache.org/jira/browse/ZEPPELIN-533.
It provides shared resource pool to exchange data across interpreter processes.

### What type of PR is it?
Feature

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-619

### How should this be tested?
create two different spark interpreter settings.
create two different notebooks each bind different spark interpreter setting.

put an object from one notebook.
read the object from the other notebook. (from the other interpreter process)

See screenshot

### Screenshots (if appropriate)
![resource_pool](https://cloud.githubusercontent.com/assets/1540981/12409095/db4e142e-be1b-11e5-91e2-4b91c5b39dc3.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? javadoc included.

Author: Lee moon soo <moon@apache.org>

This patch had conflicts when merged, resolved by
Committer: Lee moon soo <moon@apache.org>

Closes #655 from Leemoonsoo/resource_pool and squashes the following commits:

7b53f8b [Lee moon soo] Fix style
27b2ac5 [Lee moon soo] connector.getAllResourceExcept(id()) -> connector.getAllResource()
f46abd7 [Lee moon soo] Refactor get() method
2945a90 [Lee moon soo] ConcurrentHashMap instead of Collections.synchronizedMap()
8150466 [Lee moon soo] Remove synchronize block
6ac84e1 [Lee moon soo] Nullcheck before access InterpreterGroup
c96c168 [Lee moon soo] Merge branch 'master' into resource_pool
c3cb0d6 [Lee moon soo] Handling exception
14269d9 [Lee moon soo] Handling NPE
0d15577 [Lee moon soo] Add license header
0af0cd0 [Lee moon soo] null check interpreterGroup. Do not log expected exceptions
9d288fe [Lee moon soo] update test
263f580 [Lee moon soo] ZeppelinContext provides api for resource pool
b85dc59 [Lee moon soo] Fix test
2be9902 [Lee moon soo] Update test
0f6cb98 [Lee moon soo] distributed resource pool across interpreter processes
91f75e0 [Lee moon soo] distributed resource pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/ddf2c89e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/ddf2c89e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/ddf2c89e

Branch: refs/heads/master
Commit: ddf2c89ec364b28665a42145a832f6416341c771
Parents: 218a3b5
Author: Lee moon soo <moon@apache.org>
Authored: Sun Jan 31 05:39:29 2016 +0900
Committer: Lee moon soo <moon@apache.org>
Committed: Mon Feb 1 13:14:52 2016 +0900

----------------------------------------------------------------------
 .../zeppelin/flink/FlinkInterpreterTest.java    |    2 +-
 .../zeppelin/hive/HiveInterpreterTest.java      |   14 +-
 .../zeppelin/ignite/IgniteInterpreterTest.java  |    2 +-
 .../ignite/IgniteSqlInterpreterTest.java        |    2 +-
 .../zeppelin/jdbc/JDBCInterpreterTest.java      |    4 +-
 .../scalding/ScaldingInterpreterTest.java       |    2 +-
 .../apache/zeppelin/spark/ZeppelinContext.java  |   63 +-
 .../zeppelin/spark/DepInterpreterTest.java      |    1 +
 .../zeppelin/spark/SparkInterpreterTest.java    |    1 +
 .../zeppelin/spark/SparkSqlInterpreterTest.java |    2 +
 .../interpreter/InterpreterContext.java         |    8 +
 .../zeppelin/interpreter/InterpreterGroup.java  |   31 +-
 .../interpreter/remote/RemoteInterpreter.java   |   12 +-
 .../remote/RemoteInterpreterEventClient.java    |  245 +
 .../remote/RemoteInterpreterEventPoller.java    |  133 +-
 .../remote/RemoteInterpreterProcess.java        |   13 +-
 .../remote/RemoteInterpreterServer.java         |  163 +-
 .../remote/RemoteInterpreterUtils.java          |    3 +-
 .../thrift/RemoteInterpreterContext.java        |    2 +-
 .../thrift/RemoteInterpreterEvent.java          |    2 +-
 .../thrift/RemoteInterpreterEventType.java      |   12 +-
 .../thrift/RemoteInterpreterResult.java         |    2 +-
 .../thrift/RemoteInterpreterService.java        | 4179 ++++++++++++++++--
 .../resource/ByteBufferInputStream.java         |   58 +
 .../resource/DistributedResourcePool.java       |   78 +
 .../zeppelin/resource/LocalResourcePool.java    |   77 +
 .../zeppelin/resource/RemoteResource.java       |   55 +
 .../org/apache/zeppelin/resource/Resource.java  |  132 +
 .../apache/zeppelin/resource/ResourceId.java    |   53 +
 .../apache/zeppelin/resource/ResourcePool.java  |   55 +
 .../resource/ResourcePoolConnector.java         |   34 +
 .../apache/zeppelin/resource/ResourceSet.java   |   75 +
 .../main/thrift/RemoteInterpreterService.thrift |   20 +-
 .../interpreter/InterpreterContextTest.java     |    2 +-
 .../remote/RemoteAngularObjectTest.java         |    5 +-
 .../RemoteInterpreterOutputTestStream.java      |   17 +-
 .../remote/RemoteInterpreterTest.java           |   10 +
 .../mock/MockInterpreterResourcePool.java       |  112 +
 .../resource/DistributedResourcePoolTest.java   |  201 +
 .../resource/LocalResourcePoolTest.java         |   48 +
 .../zeppelin/resource/ResourceSetTest.java      |   53 +
 .../apache/zeppelin/resource/ResourceTest.java  |   35 +
 .../zeppelin/scheduler/RemoteSchedulerTest.java |   20 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |    9 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |    4 +
 .../interpreter/InterpreterFactoryTest.java     |    2 +-
 46 files changed, 5541 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 9a61be6..5a91542 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
     Properties p = new Properties();
     flink = new FlinkInterpreter(p);
     flink.open();
-    context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
+    context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
index c86fcf3..8f1285d 100644
--- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
+++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
@@ -79,9 +79,9 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
+    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message().contains("SCHEMA_NAME"));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message());
   }
 
   @Test
@@ -101,7 +101,7 @@ public class HiveInterpreterTest {
     t.open();
 
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
+        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message());
   }
 
   @Test
@@ -117,13 +117,13 @@ public class HiveInterpreterTest {
     t.open();
 
     InterpreterResult interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
 
     t.getConnection("default").close();
 
     interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
   }
 
@@ -139,7 +139,7 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
+    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null, null);
 
     //simple select test
     InterpreterResult result = t.interpret("select * from test_table", interpreterContext);
@@ -193,4 +193,4 @@ public class HiveInterpreterTest {
     assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
     hi.close();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
index cf98083..5976e21 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
 
   private IgniteInterpreter intp;
   private Ignite ignite;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
index a6dcc66..7f66523 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
 
   private Ignite ignite;
   private IgniteSqlInterpreter intp;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
----------------------------------------------------------------------
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index 5d376f2..049b137 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -94,7 +94,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null,null));
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
@@ -116,7 +116,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null,null));
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 606d4d9..1a6f2b9 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -64,7 +64,7 @@ public class ScaldingInterpreterTest {
     InterpreterGroup intpGroup = new InterpreterGroup();
     context = new InterpreterContext("note", "id", "title", "text",
         new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
-            intpGroup.getId(), null),
+            intpGroup.getId(), null), null,
         new LinkedList<InterpreterContextRunner>(), null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 0201188..389037b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -26,7 +26,6 @@ import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -44,15 +43,19 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
 
 import scala.Tuple2;
 import scala.Unit;
 import scala.collection.Iterable;
+import scala.collection.JavaConversions;
 
 /**
  * Spark context for zeppelin.
  */
-public class ZeppelinContext extends HashMap<String, Object> {
+public class ZeppelinContext {
   private SparkDependencyResolver dep;
   private InterpreterContext interpreterContext;
   private int maxResult;
@@ -632,4 +635,60 @@ public class ZeppelinContext extends HashMap<String, Object> {
     AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
     registry.remove(name, noteId, null);
   }
+
+
+  /**
+   * Add object into resource pool
+   * @param name
+   * @param value
+   */
+  public void put(String name, Object value) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    resourcePool.put(name, value);
+  }
+
+  /**
+   * Get object from resource pool
+   * Search local process first and then the other processes
+   * @param name
+   * @return null if resource not found
+   */
+  public Object get(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    if (resource != null) {
+      return resource.get();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Remove object from resourcePool
+   * @param name
+   */
+  public void remove(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    resourcePool.remove(name);
+  }
+
+  /**
+   * Check if resource pool has the object
+   * @param name
+   * @return
+   */
+  public boolean containsKey(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    return resource != null;
+  }
+
+  /**
+   * Get all resources
+   */
+  public ResourceSet getAll() {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    return resourcePool.getAll();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index 2b5613a..11c0beb 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -60,6 +60,7 @@ public class DepInterpreterTest {
 
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
         new LinkedList<InterpreterContextRunner>(), null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 7064e73..ea08f17 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -79,6 +79,7 @@ public class SparkInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            null,
             new LinkedList<InterpreterContextRunner>(),
             new InterpreterOutput(new InterpreterOutputListener() {
               @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 731eab6..30de6d6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.resource.LocalResourcePool;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,6 +67,7 @@ public class SparkSqlInterpreterTest {
     }
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
         new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
       @Override
       public void onAppend(InterpreterOutput out, byte[] line) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index e3f6b59..fd76912 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.resource.ResourcePool;
 
 /**
  * Interpreter context
@@ -50,6 +51,7 @@ public class InterpreterContext {
   private final Map<String, Object> config;
   private GUI gui;
   private AngularObjectRegistry angularObjectRegistry;
+  private ResourcePool resourcePool;
   private List<InterpreterContextRunner> runners;
 
   public InterpreterContext(String noteId,
@@ -59,6 +61,7 @@ public class InterpreterContext {
                             Map<String, Object> config,
                             GUI gui,
                             AngularObjectRegistry angularObjectRegistry,
+                            ResourcePool resourcePool,
                             List<InterpreterContextRunner> runners,
                             InterpreterOutput out
                             ) {
@@ -69,6 +72,7 @@ public class InterpreterContext {
     this.config = config;
     this.gui = gui;
     this.angularObjectRegistry = angularObjectRegistry;
+    this.resourcePool = resourcePool;
     this.runners = runners;
     this.out = out;
   }
@@ -102,6 +106,10 @@ public class InterpreterContext {
     return angularObjectRegistry;
   }
 
+  public ResourcePool getResourcePool() {
+    return resourcePool;
+  }
+
   public List<InterpreterContextRunner> getRunners() {
     return runners;
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 5af6241..4d450be 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -17,14 +17,13 @@
 
 package org.apache.zeppelin.interpreter;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.resource.ResourcePool;
 
 /**
  * InterpreterGroup is list of interpreters in the same group.
@@ -37,13 +36,27 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
 
   AngularObjectRegistry angularObjectRegistry;
   RemoteInterpreterProcess remoteInterpreterProcess;    // attached remote interpreter process
+  ResourcePool resourcePool;
+
+  private static final Map<String, InterpreterGroup> allInterpreterGroups =
+      new ConcurrentHashMap<String, InterpreterGroup>();
+
+  public static InterpreterGroup get(String id) {
+    return allInterpreterGroups.get(id);
+  }
+
+  public static Collection<InterpreterGroup> getAll() {
+    return new LinkedList(allInterpreterGroups.values());
+  }
 
   public InterpreterGroup(String id) {
     this.id = id;
+    allInterpreterGroups.put(id, this);
   }
 
   public InterpreterGroup() {
     getId();
+    allInterpreterGroups.put(id, this);
   }
 
   private static String generateId() {
@@ -135,5 +148,15 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
         remoteInterpreterProcess.dereference();
       }
     }
+
+    allInterpreterGroups.remove(id);
+  }
+
+  public void setResourcePool(ResourcePool resourcePool) {
+    this.resourcePool = resourcePool;
+  }
+
+  public ResourcePool getResourcePool() {
+    return resourcePool;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index f1eec08..43c934f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -147,8 +147,8 @@ public class RemoteInterpreter extends Interpreter {
           for (Interpreter intp : this.getInterpreterGroup()) {
             logger.info("Create remote interpreter {}", intp.getClassName());
             property.put("zeppelin.interpreter.localRepo", localRepoPath);
-            client.createInterpreter(intp.getClassName(), (Map) property);
-
+            client.createInterpreter(getInterpreterGroup().getId(),
+                    intp.getClassName(), (Map) property);
           }
         } catch (TException e) {
           broken = true;
@@ -176,7 +176,9 @@ public class RemoteInterpreter extends Interpreter {
     boolean broken = false;
     try {
       client = interpreterProcess.getClient();
-      client.close(className);
+      if (client != null) {
+        client.close(className);
+      }
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -295,6 +297,10 @@ public class RemoteInterpreter extends Interpreter {
   @Override
   public int getProgress(InterpreterContext context) {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+      return 0;
+    }
+
     Client client = null;
     try {
       client = interpreterProcess.getClient();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
new file mode 100644
index 0000000..158f145
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.resource.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Thread connection ZeppelinServer -> RemoteInterpreterServer does not provide
+ * remote method invocation from RemoteInterpreterServer -> ZeppelinServer
+ *
+ * This class provides event send and get response from RemoteInterpreterServer to
+ * ZeppelinServer.
+ *
+ * RemoteInterpreterEventPoller is counter part in ZeppelinServer
+ */
+public class RemoteInterpreterEventClient implements ResourcePoolConnector {
+  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEvent.class);
+  private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
+  private final List<ResourceSet> getAllResourceResponse = new LinkedList<ResourceSet>();
+  private final Map<ResourceId, Object> getResourceResponse = new HashMap<ResourceId, Object>();
+  private final Gson gson = new Gson();
+
+  /**
+   * Run paragraph
+   * @param runner
+   */
+  public void run(InterpreterContextRunner runner) {
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
+        gson.toJson(runner)));
+  }
+
+  /**
+   * notify new angularObject creation
+   * @param object
+   */
+  public void angularObjectAdd(AngularObject object) {
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
+  }
+
+  /**
+   * notify angularObject update
+   */
+  public void angularObjectUpdate(AngularObject object) {
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
+  }
+
+  /**
+   * notify angularObject removal
+   */
+  public void angularObjectRemove(String name, String noteId, String paragraphId) {
+    Map<String, String> removeObject = new HashMap<String, String>();
+    removeObject.put("name", name);
+    removeObject.put("noteId", noteId);
+    removeObject.put("paragraphId", paragraphId);
+
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject)));
+  }
+
+
+  /**
+   * Get all resources except for specific resourcePool
+   * @return
+   */
+  @Override
+  public ResourceSet getAllResources() {
+    // request
+    sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL, null));
+
+    synchronized (getAllResourceResponse) {
+      while (getAllResourceResponse.isEmpty()) {
+        try {
+          getAllResourceResponse.wait();
+        } catch (InterruptedException e) {
+          logger.warn(e.getMessage(), e);
+        }
+      }
+      ResourceSet resourceSet = getAllResourceResponse.remove(0);
+      return resourceSet;
+    }
+  }
+
+  @Override
+  public Object readResource(ResourceId resourceId) {
+    logger.debug("Request Read Resource {} from ZeppelinServer", resourceId.getName());
+    synchronized (getResourceResponse) {
+      // wait for previous response consumed
+      while (getResourceResponse.containsKey(resourceId)) {
+        try {
+          getResourceResponse.wait();
+        } catch (InterruptedException e) {
+          logger.warn(e.getMessage(), e);
+        }
+      }
+
+      // send request
+      Gson gson = new Gson();
+      sendEvent(new RemoteInterpreterEvent(
+          RemoteInterpreterEventType.RESOURCE_GET,
+          gson.toJson(resourceId)));
+
+      // wait for response
+      while (!getResourceResponse.containsKey(resourceId)) {
+        try {
+          getResourceResponse.wait();
+        } catch (InterruptedException e) {
+          logger.warn(e.getMessage(), e);
+        }
+      }
+      Object o = getResourceResponse.remove(resourceId);
+      getResourceResponse.notifyAll();
+      return o;
+    }
+  }
+
+  /**
+   * Supposed to call from RemoteInterpreterEventPoller
+   */
+  public void putResponseGetAllResources(List<String> resources) {
+    logger.debug("ResourceSet from ZeppelinServer");
+    ResourceSet resourceSet = new ResourceSet();
+
+    for (String res : resources) {
+      RemoteResource resource = gson.fromJson(res, RemoteResource.class);
+      resource.setResourcePoolConnector(this);
+      resourceSet.add(resource);
+    }
+
+    synchronized (getAllResourceResponse) {
+      getAllResourceResponse.add(resourceSet);
+      getAllResourceResponse.notify();
+    }
+  }
+
+  /**
+   * Supposed to call from RemoteInterpreterEventPoller
+   * @param resourceId json serialized ResourceId
+   * @param object java serialized of the object
+   */
+  public void putResponseGetResource(String resourceId, ByteBuffer object) {
+    ResourceId rid = gson.fromJson(resourceId, ResourceId.class);
+
+    logger.debug("Response resource {} from RemoteInterpreter", rid.getName());
+
+    Object o = null;
+    try {
+      o = Resource.deserializeObject(object);
+    } catch (IOException e) {
+      logger.error(e.getMessage(), e);
+    } catch (ClassNotFoundException e) {
+      logger.error(e.getMessage(), e);
+    }
+
+    synchronized (getResourceResponse) {
+      getResourceResponse.put(rid, o);
+      getResourceResponse.notifyAll();
+    }
+  }
+
+
+  /**
+   * Supposed to call from RemoteInterpreterEventPoller
+   * @return next available event
+   */
+  public RemoteInterpreterEvent pollEvent() {
+    synchronized (eventQueue) {
+      if (eventQueue.isEmpty()) {
+        try {
+          eventQueue.wait(1000);
+        } catch (InterruptedException e) {
+        }
+      }
+
+      if (eventQueue.isEmpty()) {
+        return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
+      } else {
+        RemoteInterpreterEvent event = eventQueue.remove(0);
+        logger.debug("Send event {}", event.getType());
+        return event;
+      }
+    }
+  }
+
+  public void onInterpreterOutputAppend(String noteId, String paragraphId, String output) {
+    Map<String, String> appendOutput = new HashMap<String, String>();
+    appendOutput.put("noteId", noteId);
+    appendOutput.put("paragraphId", paragraphId);
+    appendOutput.put("data", output);
+
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.OUTPUT_APPEND,
+        gson.toJson(appendOutput)));
+  }
+
+  public void onInterpreterOutputUpdate(String noteId, String paragraphId, String output) {
+    Map<String, String> appendOutput = new HashMap<String, String>();
+    appendOutput.put("noteId", noteId);
+    appendOutput.put("paragraphId", paragraphId);
+    appendOutput.put("data", output);
+
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.OUTPUT_UPDATE,
+        gson.toJson(appendOutput)));
+  }
+
+
+  private void sendEvent(RemoteInterpreterEvent event) {
+    synchronized (eventQueue) {
+      eventQueue.add(event);
+      eventQueue.notifyAll();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index b1055e2..be28bbf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -28,13 +28,21 @@ import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourceId;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 /**
- *
+ * Processes message from RemoteInterpreter process
  */
 public class RemoteInterpreterEventPoller extends Thread {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
@@ -117,6 +125,15 @@ public class RemoteInterpreterEventPoller extends Thread {
 
           interpreterProcess.getInterpreterContextRunnerPool().run(
               runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
+          ResourceSet resourceSet = getAllResourcePoolExcept();
+          sendResourcePoolResponseGetAll(resourceSet);
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
+          String resourceIdString = event.getData();
+          ResourceId resourceId = gson.fromJson(resourceIdString, ResourceId.class);
+          logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
+          Object o = getResource(resourceId);
+          sendResourceResponseGet(resourceId, o);
         } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
           // on output append
           Map<String, String> outputAppend = gson.fromJson(
@@ -143,6 +160,120 @@ public class RemoteInterpreterEventPoller extends Thread {
     }
   }
 
+  private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      List<String> resourceList = new LinkedList<String>();
+      Gson gson = new Gson();
+      for (Resource r : resourceSet) {
+        resourceList.add(gson.toJson(r));
+      }
+      client.resourcePoolResponseGetAll(resourceList);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+  }
+
+  private ResourceSet getAllResourcePoolExcept() {
+    ResourceSet resourceSet = new ResourceSet();
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+      if (intpGroup.getId().equals(interpreterGroup.getId())) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (interpreterProcess.isRunning()) {
+        Client client = null;
+        boolean broken = false;
+        try {
+          client = remoteInterpreterProcess.getClient();
+          List<String> resourceList = client.resoucePoolGetAll();
+          Gson gson = new Gson();
+          for (String res : resourceList) {
+            resourceSet.add(gson.fromJson(res, Resource.class));
+          }
+        } catch (Exception e) {
+          logger.error(e.getMessage(), e);
+          broken = true;
+        } finally {
+          if (client != null) {
+            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+          }
+        }
+      }
+    }
+    return resourceSet;
+  }
+
+
+
+  private void sendResourceResponseGet(ResourceId resourceId, Object o) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      Gson gson = new Gson();
+      String rid = gson.toJson(resourceId);
+      ByteBuffer obj;
+      if (o == null) {
+        obj = ByteBuffer.allocate(0);
+      } else {
+        obj = Resource.serializeObject(o);
+      }
+      client.resourceResponseGet(rid, obj);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+  }
+
+  private Object getResource(ResourceId resourceId) {
+    InterpreterGroup intpGroup = InterpreterGroup.get(resourceId.getResourcePoolId());
+    if (intpGroup == null) {
+      return null;
+    }
+    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null) {
+      ResourcePool localPool = intpGroup.getResourcePool();
+      if (localPool != null) {
+        return localPool.get(resourceId.getName());
+      }
+    } else if (interpreterProcess.isRunning()) {
+      Client client = null;
+      boolean broken = false;
+      try {
+        client = remoteInterpreterProcess.getClient();
+        ByteBuffer res = client.resourceGet(resourceId.getName());
+        Object o = Resource.deserializeObject(res);
+        return o;
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+        broken = true;
+      } finally {
+        if (client != null) {
+          intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+        }
+      }
+    }
+    return null;
+  }
+
   private void waitQuietly() {
     try {
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 2c88894..9a2d503 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -149,6 +149,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
   }
 
   public Client getClient() throws Exception {
+    if (clientPool == null || clientPool.isClosed()) {
+      return null;
+    }
     return clientPool.borrowObject();
   }
 
@@ -191,7 +194,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
         } catch (Exception e) {
           // safely ignore exception while client.shutdown() may terminates remote process
           logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " +
-              "safely ignore exception while client.shutdown() may terminates remote process", e);
+              "safely ignore exception while client.shutdown() may terminates remote process");
+          logger.debug(e.getMessage(), e);
         } finally {
           if (client != null) {
             // no longer used
@@ -303,8 +307,13 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
     } catch (TException e) {
       broken = true;
       logger.error("Can't update angular object", e);
+    } catch (NullPointerException e) {
+      logger.error("Remote interpreter process not started", e);
+      return;
     } finally {
-      releaseClient(client, broken);
+      if (client != null) {
+        releaseClient(client, broken);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 02736fe..c3a0f90 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -18,9 +18,11 @@
 package org.apache.zeppelin.interpreter.remote;
 
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -39,9 +41,9 @@ import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.resource.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -64,6 +66,7 @@ public class RemoteInterpreterServer
 
   InterpreterGroup interpreterGroup;
   AngularObjectRegistry angularObjectRegistry;
+  DistributedResourcePool resourcePool;
   Gson gson = new Gson();
 
   RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
@@ -71,13 +74,10 @@ public class RemoteInterpreterServer
   private int port;
   private TThreadPoolServer server;
 
-  List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
+  RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
 
   public RemoteInterpreterServer(int port) throws TTransportException {
     this.port = port;
-    interpreterGroup = new InterpreterGroup();
-    angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
-    interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
 
     processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this);
     TServerSocket serverTransport = new TServerSocket(port);
@@ -93,8 +93,10 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
-    interpreterGroup.close();
-    interpreterGroup.destroy();
+    if (interpreterGroup != null) {
+      interpreterGroup.close();
+      interpreterGroup.destroy();
+    }
 
     server.stop();
 
@@ -140,8 +142,18 @@ public class RemoteInterpreterServer
 
 
   @Override
-  public void createInterpreter(String className, Map<String, String> properties)
+  public void createInterpreter(String interpreterGroupId, String className, Map<String, String>
+          properties)
       throws TException {
+
+    if (interpreterGroup == null) {
+      interpreterGroup = new InterpreterGroup(interpreterGroupId);
+      angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
+      resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
+      interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+      interpreterGroup.setResourcePool(resourcePool);
+    }
+
     try {
       Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
       Properties p = new Properties();
@@ -240,6 +252,7 @@ public class RemoteInterpreterServer
         context.getGui());
   }
 
+
   class InterpretJobListener implements JobListener {
 
     @Override
@@ -383,6 +396,7 @@ public class RemoteInterpreterServer
             new TypeToken<Map<String, Object>>() {}.getType()),
         gson.fromJson(ric.getGui(), GUI.class),
         interpreterGroup.getAngularObjectRegistry(),
+        interpreterGroup.getResourcePool(),
         contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
   }
 
@@ -391,30 +405,12 @@ public class RemoteInterpreterServer
     return new InterpreterOutput(new InterpreterOutputListener() {
       @Override
       public void onAppend(InterpreterOutput out, byte[] line) {
-        Map<String, String> appendOutput = new HashMap<String, String>();
-        appendOutput.put("noteId", noteId);
-        appendOutput.put("paragraphId", paragraphId);
-        appendOutput.put("data", new String(line));
-
-        Gson gson = new Gson();
-
-        sendEvent(new RemoteInterpreterEvent(
-                RemoteInterpreterEventType.OUTPUT_APPEND,
-                gson.toJson(appendOutput)));
+        eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
       }
 
       @Override
       public void onUpdate(InterpreterOutput out, byte[] output) {
-        Map<String, String> appendOutput = new HashMap<String, String>();
-        appendOutput.put("noteId", noteId);
-        appendOutput.put("paragraphId", paragraphId);
-        appendOutput.put("data", new String(output));
-
-        Gson gson = new Gson();
-
-        sendEvent(new RemoteInterpreterEvent(
-                RemoteInterpreterEventType.OUTPUT_UPDATE,
-                gson.toJson(appendOutput)));
+        eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
       }
     });
   }
@@ -431,10 +427,7 @@ public class RemoteInterpreterServer
 
     @Override
     public void run() {
-      Gson gson = new Gson();
-      server.sendEvent(new RemoteInterpreterEvent(
-          RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
-          gson.toJson(this)));
+      server.eventClient.run(this);
     }
   }
 
@@ -451,6 +444,10 @@ public class RemoteInterpreterServer
   @Override
   public String getStatus(String jobId)
       throws TException {
+    if (interpreterGroup == null) {
+      return "Unknown";
+    }
+
     synchronized (interpreterGroup) {
       for (Interpreter intp : interpreterGroup) {
         for (Job job : intp.getScheduler().getJobsRunning()) {
@@ -473,50 +470,28 @@ public class RemoteInterpreterServer
 
   @Override
   public void onAdd(String interpreterGroupId, AngularObject object) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
+    eventClient.angularObjectAdd(object);
   }
 
   @Override
   public void onUpdate(String interpreterGroupId, AngularObject object) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
+    eventClient.angularObjectUpdate(object);
   }
 
   @Override
   public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
-    Map<String, String> removeObject = new HashMap<String, String>();
-    removeObject.put("name", name);
-    removeObject.put("noteId", noteId);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject)));
+    eventClient.angularObjectRemove(name, noteId, paragraphId);
   }
 
-  private void sendEvent(RemoteInterpreterEvent event) {
-    synchronized (eventQueue) {
-      eventQueue.add(event);
-      eventQueue.notifyAll();
-    }
-  }
 
+  /**
+   * Poll event from RemoteInterpreterEventPoller
+   * @return
+   * @throws TException
+   */
   @Override
   public RemoteInterpreterEvent getEvent() throws TException {
-    synchronized (eventQueue) {
-      if (eventQueue.isEmpty()) {
-        try {
-          eventQueue.wait(1000);
-        } catch (InterruptedException e) {
-          logger.info("Exception in RemoteInterpreterServer while getEvent, eventQueue.wait", e);
-        }
-      }
-
-      if (eventQueue.isEmpty()) {
-        return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
-      } else {
-        return eventQueue.remove(0);
-      }
-    }
+    return eventClient.pollEvent();
   }
 
   /**
@@ -534,7 +509,7 @@ public class RemoteInterpreterServer
     // first try local objects
     AngularObject ao = registry.get(name, noteId, paragraphId);
     if (ao == null) {
-      logger.error("Angular object {} not exists", name);
+      logger.debug("Angular object {} not exists", name);
       return;
     }
 
@@ -551,8 +526,8 @@ public class RemoteInterpreterServer
         ao.set(value, false);
         return;
       } catch (Exception e) {
-        // no luck
-        logger.info("Exception in RemoteInterpreterServer while angularObjectUpdate, no luck", e);
+        // it's not a previous object's type. proceed to treat as a generic type
+        logger.debug(e.getMessage(), e);
       }
     }
 
@@ -563,8 +538,8 @@ public class RemoteInterpreterServer
           new TypeToken<Map<String, Object>>() {
           }.getType());
       } catch (Exception e) {
-        // no lock
-        logger.info("Exception in RemoteInterpreterServer while angularObjectUpdate, no lock", e);
+        // it's not a generic json object, too. okay, proceed to threat as a string type
+        logger.debug(e.getMessage(), e);
       }
     }
 
@@ -598,8 +573,8 @@ public class RemoteInterpreterServer
           new TypeToken<Map<String, Object>>() {
           }.getType());
     } catch (Exception e) {
-      // nolock
-      logger.info("Exception in RemoteInterpreterServer while angularObjectAdd, nolock", e);
+      // it's okay. proceed to treat object as a string
+      logger.debug(e.getMessage(), e);
     }
 
     // try string object type at last
@@ -616,4 +591,52 @@ public class RemoteInterpreterServer
     AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
     registry.remove(name, noteId, paragraphId, false);
   }
+
+  @Override
+  public void resourcePoolResponseGetAll(List<String> resources) throws TException {
+    eventClient.putResponseGetAllResources(resources);
+  }
+
+  /**
+   * Get payload of resource from remote
+   * @param resourceId json serialized ResourceId
+   * @param object java serialized of the object
+   * @throws TException
+   */
+  @Override
+  public void resourceResponseGet(String resourceId, ByteBuffer object) throws TException {
+    eventClient.putResponseGetResource(resourceId, object);
+  }
+
+  @Override
+  public List<String> resoucePoolGetAll() throws TException {
+    logger.debug("Request getAll from ZeppelinServer");
+
+    ResourceSet resourceSet = resourcePool.getAll(false);
+    List<String> result = new LinkedList<String>();
+    Gson gson = new Gson();
+
+    for (Resource r : resourceSet) {
+      result.add(gson.toJson(r));
+    }
+
+    return result;
+  }
+
+  @Override
+  public ByteBuffer resourceGet(String resourceName) throws TException {
+    logger.debug("Request resourceGet {} from ZeppelinServer", resourceName);
+    Resource resource = resourcePool.get(resourceName, false);
+
+    if (resource == null || resource.get() == null || !resource.isSerializable()) {
+      return ByteBuffer.allocate(0);
+    } else {
+      try {
+        return Resource.serializeObject(resource.get());
+      } catch (IOException e) {
+        logger.error(e.getMessage(), e);
+        return ByteBuffer.allocate(0);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 4d2e46e..a66b52a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -47,7 +47,8 @@ public class RemoteInterpreterUtils {
       discover.close();
       return true;
     } catch (IOException e) {
-      LOGGER.info("Exception in RemoteInterpreterUtils while checkIfRemoteEndpointAccessible", e);
+      // end point is not accessible
+      LOGGER.debug(e.getMessage(), e);
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index 175f482..b6a3da1 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 79203fb..e560ec8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index d650318..7cb7963 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -34,8 +34,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
   ANGULAR_OBJECT_UPDATE(3),
   ANGULAR_OBJECT_REMOVE(4),
   RUN_INTERPRETER_CONTEXT_RUNNER(5),
-  OUTPUT_APPEND(6),
-  OUTPUT_UPDATE(7);
+  RESOURCE_POOL_GET_ALL(6),
+  RESOURCE_GET(7),
+  OUTPUT_APPEND(8),
+  OUTPUT_UPDATE(9);
 
   private final int value;
 
@@ -67,8 +69,12 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
       case 5:
         return RUN_INTERPRETER_CONTEXT_RUNNER;
       case 6:
-        return OUTPUT_APPEND;
+        return RESOURCE_POOL_GET_ALL;
       case 7:
+        return RESOURCE_GET;
+      case 8:
+        return OUTPUT_APPEND;
+      case 9:
         return OUTPUT_UPDATE;
       default:
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index cc50f9c..6539756 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
 


Mime
View raw message