zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: ZEPPELIN-298 Paragraph progress bar and % complete not working
Date Fri, 11 Sep 2015 04:43:58 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master eb29c6daa -> ae1c06bb1


ZEPPELIN-298 Paragraph progress bar and % complete not working

This PR fixes ZEPPELIN-298 by searching correct "addListener" method which have JobProgressListener
as a parameter.

Author: Lee moon soo <moon@apache.org>

Closes #294 from Leemoonsoo/ZEPPELIN-298 and squashes the following commits:

6fb2397 [Lee moon soo] Fix job progress listener registration


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/ae1c06bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/ae1c06bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/ae1c06bb

Branch: refs/heads/master
Commit: ae1c06bb10a49784aff0f807d803b874df504a79
Parents: eb29c6d
Author: Lee moon soo <moon@apache.org>
Authored: Thu Sep 10 14:35:27 2015 -0700
Committer: Lee moon soo <moon@apache.org>
Committed: Thu Sep 10 21:43:54 2015 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/spark/SparkInterpreter.java | 36 ++++++++++++++++----
 .../zeppelin/spark/SparkSqlInterpreter.java     |  2 ++
 .../zeppelin/spark/SparkInterpreterTest.java    | 12 +++++--
 3 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ae1c06bb/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index dfb846c..a2e31fb 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -157,15 +157,41 @@ public class SparkInterpreter extends Interpreter {
     return sc != null;
   }
 
-  private static JobProgressListener setupListeners(SparkContext context) {
+  static JobProgressListener setupListeners(SparkContext context) {
     JobProgressListener pl = new JobProgressListener(context.getConf());
     try {
       Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context);
-      Method m = listenerBus.getClass().getMethod("addListener", SparkListener.class);
-      m.invoke(listenerBus, pl);
+
+      Method[] methods = listenerBus.getClass().getMethods();
+      Method addListenerMethod = null;
+      for (Method m : methods) {
+        if (!m.getName().equals("addListener")) {
+          continue;
+        }
+
+        Class<?>[] parameterTypes = m.getParameterTypes();
+
+        if (parameterTypes.length != 1) {
+          continue;
+        }
+
+        if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
+          continue;
+        }
+
+        addListenerMethod = m;
+        break;
+      }
+
+      if (addListenerMethod != null) {
+        addListenerMethod.invoke(listenerBus, pl);
+      } else {
+        return null;
+      }
     } catch (NoSuchMethodException | SecurityException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException e) {
       e.printStackTrace();
+      return null;
     }
     return pl;
   }
@@ -676,12 +702,11 @@ public class SparkInterpreter extends Interpreter {
     while (it.hasNext()) {
       ActiveJob job = it.next();
       String g = (String) job.properties().get("spark.jobGroup.id");
-
       if (jobGroup.equals(g)) {
         int[] progressInfo = null;
         try {
           Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
-          if (sparkVersion.getProgress1_0()) {          
+          if (sparkVersion.getProgress1_0()) {
             progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
           } else {
             progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
@@ -748,7 +773,6 @@ public class SparkInterpreter extends Interpreter {
           this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
 
       Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks");
-
       Set<Tuple2<Object, Object>> keys =
           JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava();
       for (Tuple2<Object, Object> k : keys) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ae1c06bb/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 7b26e34..1ee5f9c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -125,6 +125,7 @@ public class SparkSqlInterpreter extends Interpreter {
       sc.setLocalProperty("spark.scheduler.pool", null);
     }
 
+    sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
     Object rdd = null;
     try {
       // method signature of sqlc.sql() is changed
@@ -139,6 +140,7 @@ public class SparkSqlInterpreter extends Interpreter {
     }
 
     String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult);
+    sc.clearJobGroup();
     return new InterpreterResult(Code.SUCCESS, msg);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ae1c06bb/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 87177f1..be65c09 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.zeppelin.spark;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.util.HashMap;
@@ -26,6 +25,7 @@ import java.util.LinkedList;
 import java.util.Properties;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -119,7 +119,7 @@ public class SparkInterpreterTest {
 
   @Test
   public void testNextLineInvocation() {
-    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
   
+    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
   }
 
   @Test
@@ -128,6 +128,12 @@ public class SparkInterpreterTest {
   }
 
   @Test
+  public void testListener() {
+    SparkContext sc = repl.getSparkContext();
+    assertNotNull(SparkInterpreter.setupListeners(sc));
+  }
+
+  @Test
   public void testSparkSql(){
     repl.interpret("case class Person(name:String, age:Int)\n", context);
     repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\",
51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);


Mime
View raw message