zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [1/2] incubator-zeppelin git commit: [ZEPPELIN-713] Expand ResourceId and put InterpreterResult into ResourcePool
Date Tue, 15 Mar 2016 18:33:54 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master e2122bc9c -> 85a2ad376


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
index a03655b..bbc3f06 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
@@ -72,4 +72,34 @@ public class ResourceSet extends LinkedList<Resource> {
     }
     return result;
   }
+
+  public ResourceSet filterByNoteId(String noteId) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (equals(r.getResourceId().getNoteId(), noteId)) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByParagraphId(String paragraphId) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (equals(r.getResourceId().getParagraphId(), paragraphId)) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  private boolean equals(String a, String b) {
+    if (a == null && b == null) {
+      return true;
+    } else if (a != null && b != null) {
+      return a.equals(b);
+    } else {
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java
new file mode 100644
index 0000000..2d14fd4
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/WellKnownResourceName.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Well known resource names in ResourcePool
+ */
+public enum WellKnownResourceName {
+  ParagraphResult("zeppelin.paragraph.result");     // paragraph run result
+
+  String name;
+  WellKnownResourceName(String name) {
+    this.name = name;
+  }
+
+  public String toString() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 224433d..3a70caa 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -78,7 +78,9 @@ service RemoteInterpreterService {
   // get all resources in the interpreter process
   list<string> resoucePoolGetAll();
   // get value of resource
-  binary resourceGet(1: string resourceName);
+  binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
+  // remove resource
+  bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
 
   void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
   object);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
index 1db68ad..3826b90 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
 
 public class MockInterpreterResourcePool extends Interpreter {
@@ -61,9 +62,18 @@ public class MockInterpreterResourcePool extends Interpreter {
   public InterpreterResult interpret(String st, InterpreterContext context) {
     String[] stmt = st.split(" ");
     String cmd = stmt[0];
+    String noteId = null;
+    String paragraphId = null;
     String name = null;
     if (stmt.length >= 2) {
-      name = stmt[1];
+      String[] npn = stmt[1].split(":");
+      if (npn.length == 3) {
+        noteId = npn[0];
+        paragraphId = npn[1];
+        name = npn[2];
+      } else {
+        name = stmt[1];
+      }
     }
     String value = null;
     if (stmt.length == 3) {
@@ -73,11 +83,16 @@ public class MockInterpreterResourcePool extends Interpreter {
     ResourcePool resourcePool = context.getResourcePool();
     Object ret = null;
     if (cmd.equals("put")) {
-      resourcePool.put(name, value);
+      resourcePool.put(noteId, paragraphId, name, value);
     } else if (cmd.equalsIgnoreCase("get")) {
-      ret = resourcePool.get(name).get();
+      Resource resource = resourcePool.get(noteId, paragraphId, name);
+      if (resource != null) {
+        ret = resourcePool.get(noteId, paragraphId, name).get();
+      } else {
+        ret = "";
+      }
     } else if (cmd.equals("remove")) {
-      ret = resourcePool.remove(name);
+      ret = resourcePool.remove(noteId, paragraphId, name);
     } else if (cmd.equals("getAll")) {
       ret = resourcePool.getAll();
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
index a99fde2..e49b437 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -136,12 +136,13 @@ public class DistributedResourcePoolTest {
     InterpreterResult ret;
     intp1.interpret("put key1 value1", context);
     intp2.interpret("put key2 value2", context);
+    int numInterpreterResult = 2;
 
     ret = intp1.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
+    assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
 
     ret = intp2.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
+    assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
 
     ret = intp1.interpret("get key1", context);
     assertEquals("value1", gson.fromJson(ret.message(), String.class));
@@ -201,4 +202,44 @@ public class DistributedResourcePoolTest {
     assertEquals("value1", pool1.getAll().get(0).get());
     assertEquals("value2", pool1.getAll().get(1).get());
   }
+
+  @Test
+  public void testResourcePoolUtils() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+
+    // when create some resources
+    intp1.interpret("put note1:paragraph1:key1 value1", context);
+    intp1.interpret("put note1:paragraph2:key1 value2", context);
+    intp2.interpret("put note2:paragraph1:key1 value1", context);
+    intp2.interpret("put note2:paragraph2:key2 value2", context);
+
+    int numInterpreterResult = 2;
+
+    // then get all resources.
+    assertEquals(numInterpreterResult + 4, ResourcePoolUtils.getAllResources().size());
+
+    // when remove all resources from note1
+    ResourcePoolUtils.removeResourcesBelongsToNote("note1");
+
+    // then resources should be removed.
+    assertEquals(numInterpreterResult + 2, ResourcePoolUtils.getAllResources().size());
+    assertEquals("", gson.fromJson(
+        intp1.interpret("get note1:paragraph1:key1", context).message(),
+        String.class));
+    assertEquals("", gson.fromJson(
+        intp1.interpret("get note1:paragraph2:key1", context).message(),
+        String.class));
+
+
+    // when remove all resources from note2:paragraph1
+    ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
+
+    // then 1
+    assertEquals(numInterpreterResult + 1, ResourcePoolUtils.getAllResources().size());
+    assertEquals("value2", gson.fromJson(
+        intp1.interpret("get note2:paragraph2:key2", context).message(),
+        String.class));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 3221054..f0fa385 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -434,6 +434,8 @@ public class InterpreterFactory {
       angularObjectRegistry = new AngularObjectRegistry(
           id,
           angularObjectRegistryListener);
+
+      // TODO(moon) : create distributed resource pool for local interpreters and set
     }
 
     interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index b0470c8..6a09735 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.utility.IdHashes;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -208,6 +209,8 @@ public class Note implements Serializable, JobListener {
    * @return a paragraph that was deleted, or <code>null</code> otherwise
    */
   public Paragraph removeParagraph(String paragraphId) {
+    removeAllAngularObjectInParagraph(paragraphId);
+    ResourcePoolUtils.removeResourcesBelongsToParagraph(id(), paragraphId);
     synchronized (paragraphs) {
       Iterator<Paragraph> i = paragraphs.iterator();
       while (i.hasNext()) {
@@ -220,7 +223,7 @@ public class Note implements Serializable, JobListener {
       }
     }
 
-    removeAllAngularObjectInParagraph(paragraphId);
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 8a14b87..4827bff 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -40,6 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.search.SearchService;
 import org.quartz.CronScheduleBuilder;
@@ -307,6 +308,8 @@ public class Notebook {
       }
     }
 
+    ResourcePoolUtils.removeResourcesBelongsToNote(id);
+
     try {
       note.unpersist();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
index 079846c..cf0a613 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
@@ -45,13 +45,20 @@ public class MockInterpreter1 extends Interpreter{
 
 	@Override
 	public InterpreterResult interpret(String st, InterpreterContext context) {
+		InterpreterResult result;
 
 		if ("getId".equals(st)) {
 			// get unique id of this interpreter instance
-			return new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+			result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
 		} else {
-			return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
+			result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
 		}
+
+		if (context.getResourcePool() != null) {
+			context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result",
result);
+		}
+
+		return result;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
index dd465a5..bae4b8d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter2.java
@@ -45,7 +45,19 @@ public class MockInterpreter2 extends Interpreter{
 
 	@Override
 	public InterpreterResult interpret(String st, InterpreterContext context) {
-		return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: "+st);
+		InterpreterResult result;
+
+		if ("getId".equals(st)) {
+			// get unique id of this interpreter instance
+			result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+		} else {
+			result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
+		}
+
+		if (context.getResourcePool() != null) {
+			context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result",
result);
+		}
+		return result;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/85a2ad37/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 06889d6..3c89c35 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -326,6 +328,33 @@ public class NotebookTest implements JobListenerFactory{
   }
 
   @Test
+  public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
+    Note note = notebook.createNote();
+    note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+      intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
+    }
+    Paragraph p1 = note.addParagraph();
+    p1.setText("hello");
+    Paragraph p2 = note.addParagraph();
+    p2.setText("%mock2 world");
+
+    note.runAll();
+    while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
+    while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
+
+    assertEquals(2, ResourcePoolUtils.getAllResources().size());
+
+    // remove a paragraph
+    note.removeParagraph(p1.getId());
+    assertEquals(1, ResourcePoolUtils.getAllResources().size());
+
+    // remove note
+    notebook.removeNote(note.id());
+    assertEquals(0, ResourcePoolUtils.getAllResources().size());
+  }
+
+  @Test
   public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedException,
       IOException {
     // create a note and a paragraph


Mime
View raw message