flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] flink git commit: Some code cleanups on Scala Shell
Date Thu, 28 May 2015 13:48:59 GMT
Some code cleanups on Scala Shell


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb6de22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb6de22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb6de22

Branch: refs/heads/master
Commit: 6bb6de22fb6602058430d6d4eebf7cf36e404aca
Parents: 717f881
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue May 26 14:35:53 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu May 28 15:48:17 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/RemoteEnvironment.java       | 14 +----
 .../api/java/ScalaShellRemoteEnvironment.java   | 29 +++--------
 .../org.apache.flink/api/scala/FlinkILoop.scala | 54 ++++++++------------
 .../org.apache.flink/api/scala/FlinkShell.scala | 28 +++++-----
 4 files changed, 44 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index a2f2891..6f84077 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -30,9 +30,9 @@ import org.apache.flink.api.common.PlanExecutor;
  */
 public class RemoteEnvironment extends ExecutionEnvironment {
 	
-	private final String host;
+	protected final String host;
 	
-	private final int port;
+	protected final int port;
 	
 	private final String[] jarFiles;
 	
@@ -87,14 +87,4 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
 				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
 	}
-
-
-	// needed to call execute on ScalaShellRemoteEnvironment
-	public int getPort() {
-		return this.port;
-	}
-
-	public String getHost() {
-		return this.host;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
index cb470c9..79f9576 100644
--- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
+++ b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
@@ -25,15 +25,12 @@ import org.apache.flink.api.common.PlanExecutor;
 
 import org.apache.flink.api.scala.FlinkILoop;
 
-import java.io.File;
-
 /**
- * ScalaShellRemoteEnvironment references the JobManager through host and port parameters,
- * and the Scala Shell (FlinkILoop).
- * Upon calling execute(), it reads compiled lines of the Scala shell, aggregates them to
a Jar
- * and sends aggregated jar to the JobManager.
+ * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
+ * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
+ * use the reference of the ILoop to write the compiled classes of the current session to
+ * a Jar file and submit these with the program.
  */
-
 public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 
 	// reference to Scala Shell, for access to virtual directory
@@ -62,22 +59,12 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
 
-		// write virtual files to disk first
-		JarHelper jh = new JarHelper();
-
-		flinkILoop.writeFilesToDisk();
-
-		// jarr up.
-		File inFile = new File( flinkILoop.getTmpDirShell().getAbsolutePath());
-		File outFile = new File( flinkILoop.getTmpJarShell().getAbsolutePath());
-
-		jh.jarDir(inFile, outFile);
-
-		String[] jarFiles = {outFile.getAbsolutePath()};
+		String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
 
 		// call "traditional" execution methods
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(super.getHost(), super.getPort(),
jarFiles);
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
+
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 0de4953..83d0c72 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -20,17 +20,17 @@ package org.apache.flink.api.scala
 
 import java.io.{BufferedReader, File, FileOutputStream}
 
-import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
-import org.apache.flink.api.java.ScalaShellRemoteEnvironment
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
 import org.apache.flink.util.AbstractID
 
 
-class FlinkILoop(val host: String,
-                 val port: Int,
-                 in0: Option[BufferedReader],
-                 out0: JPrintWriter)
+class FlinkILoop(
+    val host: String,
+    val port: Int,
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
   extends ILoop(in0, out0) {
 
   def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
@@ -52,11 +52,6 @@ class FlinkILoop(val host: String,
     scalaEnv
   }
 
-
-  /**
-   * CUSTOM START METHODS OVERRIDE:
-   */
-
   addThunk {
     intp.beQuietDuring {
       // automatically imports the flink scala api
@@ -68,7 +63,6 @@ class FlinkILoop(val host: String,
   }
 
 
-
   /**
    * creates a temporary directory to store compiled console files
    */
@@ -96,18 +90,20 @@ class FlinkILoop(val host: String,
 
 
   /**
-   * writes contents of the compiled lines that have been executed in the shell into a
-   * "physical directory": creates a unique temporary directory
+   * Packages the compiled classes of the current shell session into a Jar file for execution
+   * on a Flink cluster.
+   *
+   * @return The path of the created Jar file
    */
-  def writeFilesToDisk(): Unit = {
+  def writeFilesToDisk(): File = {
     val vd = intp.virtualDirectory
 
-    var vdIt = vd.iterator
+    val vdIt = vd.iterator
 
     for (fi <- vdIt) {
       if (fi.isDirectory) {
 
-        var fiIt = fi.iterator
+        val fiIt = fi.iterator
 
         for (f <- fiIt) {
 
@@ -128,6 +124,14 @@ class FlinkILoop(val host: String,
         }
       }
     }
+
+    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+    val jh: JarHelper = new JarHelper
+    jh.jarDir(compiledClasses, jarFilePath)
+
+    jarFilePath
   }
 
   /**
@@ -183,20 +187,4 @@ NOTE: Use the prebound Execution Environment "env" to read data and execute
your
 HINT: You can use print() on a DataSet to print the contents to this shell.
       """)
   }
-
-  //  getter functions:
-  // get (root temporary folder)
-  def getTmpDirBase(): File = {
-    return (this.tmpDirBase);
-  }
-
-  // get shell folder name inside tmp dir
-  def getTmpDirShell(): File = {
-    return (this.tmpDirShell)
-  }
-
-  // get tmp jar file name
-  def getTmpJarShell(): File = {
-    return (this.tmpJarShell)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
index 90615ec..7ad1d2f 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 
-
-
 object FlinkShell {
 
   def main(args: Array[String]) {
@@ -33,28 +31,29 @@ object FlinkShell {
     // scopt, command line arguments
     case class Config(port: Int = -1,
                       host: String = "none")
-    val parser = new scopt.OptionParser[Config] ("scopt") {
-      head ("scopt", "3.x")
+    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+      head ("Flink Scala Shell")
+
       opt[Int] ('p', "port") action {
         (x, c) =>
           c.copy (port = x)
-      } text ("port specifies port of running JobManager")
+      } text("port specifies port of running JobManager")
+
       opt[(String)] ('h',"host") action {
         case (x, c) =>
           c.copy (host = x)
-      }  text ("host specifies host name of running JobManager")
-      help("help") text("prints this usage text")
+      }  text("host specifies host name of running JobManager")
 
+      help("help") text("prints this usage text")
     }
 
 
     // parse arguments
-    parser.parse (args, Config () ) map {
-      config =>
-        startShell(config.host,config.port);
-    } getOrElse {
-      // arguments are bad, usage message will have been displayed
-      println("Could not parse program arguments")
+    parser.parse (args, Config () ) match {
+      case Some(config) =>
+        startShell(config.host,config.port)
+
+      case _ => println("Could not parse program arguments")
     }
   }
 
@@ -65,8 +64,7 @@ object FlinkShell {
     var cluster: LocalFlinkMiniCluster = null
 
     // either port or userhost not specified by user, create new minicluster
-    val (host,port) = if (userHost == "none" || userPort == -1 )
-    {
+    val (host,port) = if (userHost == "none" || userPort == -1 ) {
       println("Creating new local server")
       cluster = new LocalFlinkMiniCluster(new Configuration, false)
       ("localhost",cluster.getJobManagerRPCPort)


Mime
View raw message