flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [28/47] flink git commit: [FLINK-2852] [test-stability] Fix ScalaShellITSuite and ScalaShellLocalStartupITCase
Date Tue, 20 Oct 2015 07:59:19 GMT
[FLINK-2852] [test-stability] Fix ScalaShellITSuite and ScalaShellLocalStartupITCase

Changes test program to use an int accumulator which is checked at the end of the program.
This avoids to look for the "Job status changed to FINISHED." string in the stdout output,
which can sometimes not be printed if the JobExecutionResult arrives earlier than the
JobStatusChanged message at the JobClientActor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/630798d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/630798d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/630798d3

Branch: refs/heads/master
Commit: 630798d36f0e9cf0b7b6139ccfd8583ba0ae80b1
Parents: e3ad962
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Oct 15 02:07:53 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Oct 20 00:16:53 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-scala-shell/pom.xml         |  33 --
 .../org/apache/flink/api/scala/FlinkILoop.scala |   1 -
 .../org/apache/flink/api/scala/FlinkShell.scala |  38 ++-
 .../flink/api/scala/ScalaShellITCase.scala      | 332 +++++++++++++++++++
 .../flink/api/scala/ScalaShellITSuite.scala     | 286 ----------------
 .../scala/ScalaShellLocalStartupITCase.scala    |  75 +++--
 tools/log4j-travis.properties                   |   1 -
 7 files changed, 403 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
index c4b5bd6..371257b 100644
--- a/flink-staging/flink-scala-shell/pom.xml
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -195,39 +195,6 @@ under the License.
 			</plugin>
 
 			<plugin>
-				<groupId>org.scalatest</groupId>
-				<artifactId>scalatest-maven-plugin</artifactId>
-				<version>1.0</version>
-				<configuration>
-					<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-					<stdout>W</stdout> <!-- Skip coloring output -->
-				</configuration>
-				<executions>
-					<execution>
-						<id>scala-test</id>
-						<goals>
-							<goal>test</goal>
-						</goals>
-						<configuration>
-							<suffixes>(?&lt;!(IT|Integration))(Test|Suite|Case)</suffixes>
-							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dlog.dir=${log.dir}
-Dmvn.forkNumber=1 -XX:-UseGCOverheadLimit</argLine>
-						</configuration>
-					</execution>
-					<execution>
-						<id>integration-test</id>
-						<phase>integration-test</phase>
-						<goals>
-							<goal>test</goal>
-						</goals>
-						<configuration>
-							<suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
-							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dlog.dir=${log.dir}
-Dmvn.forkNumber=1 -XX:-UseGCOverheadLimit</argLine>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
 				<groupId>org.scalastyle</groupId>
 				<artifactId>scalastyle-maven-plugin</artifactId>
 				<version>0.5.0</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
index 9fb45a8..a26ac2e 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -233,6 +233,5 @@ HINT: You can use print() on a DataSet to print the contents to this shell.
   }
 
   def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 54bbf80..eb7f816 100644
--- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -22,7 +22,7 @@ import java.io.{StringWriter, BufferedReader}
 
 import org.apache.flink.api.common.ExecutionMode
 
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
@@ -86,7 +86,7 @@ object FlinkShell {
           config.flinkShellExecutionMode,
           config.externalJars)
 
-      case _ => println("Could not parse program arguments")
+      case _ => System.out.println("Could not parse program arguments")
     }
   }
 
@@ -97,36 +97,41 @@ object FlinkShell {
       executionMode: ExecutionMode.Value,
       externalJars: Option[Array[String]] = None): Unit ={
     
-    println("Starting Flink Shell:")
+    System.out.println("Starting Flink Shell:")
 
     // either port or userhost not specified by user, create new minicluster
     val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
       executionMode match {
         case ExecutionMode.LOCAL =>
-          val miniCluster = new LocalFlinkMiniCluster(new Configuration, false)
+          val config = new Configuration()
+          config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+          val miniCluster = new LocalFlinkMiniCluster(config, false)
           miniCluster.start()
           val port = miniCluster.getLeaderRPCPort
-          println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
+          System.out.println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
           ("localhost", port, Some(miniCluster))
 
         case ExecutionMode.REMOTE =>
           if (userHost == "none" || userPort == -1) {
-            println("Error: <host> or <port> not specified!")
+            System.out.println("Error: <host> or <port> not specified!")
             return
           } else {
-            println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n")
+            System.out.println(
+              s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n")
             (userHost, userPort, None)
           }
 
         case ExecutionMode.UNDEFINED =>
-          println("Error: please specify execution mode:")
-          println("[local | remote <host> <port>]")
+          System.out.println("Error: please specify execution mode:")
+          System.out.println("[local | remote <host> <port>]")
           return
       }
 
+    var repl: Option[FlinkILoop] = None
+
     try {
       // custom shell
-      val repl: FlinkILoop =
+      repl = Some(
         bufferedReader match {
 
           case Some(br) =>
@@ -135,21 +140,20 @@ object FlinkShell {
 
           case None =>
             new FlinkILoop(host, port, externalJars)
-        }
+        })
 
       val settings = new Settings()
 
       settings.usejavacp.value = true
+      settings.Yreplsync.value = true
 
       // start scala interpreter shell
-      repl.process(settings)
+      repl.foreach(_.process(settings))
     } finally {
-      cluster match {
-        case Some(c) => c.stop()
-        case None =>
-      }
+      repl.foreach(_.closeInterpreter())
+      cluster.foreach(_.stop())
     }
 
-    println(" good bye ..")
+    System.out.println(" good bye ..")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
new file mode 100644
index 0000000..de2f3ec
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.util.TestLogger
+import org.junit.{AfterClass, BeforeClass, Test, Assert}
+
+import scala.concurrent.duration.FiniteDuration
+import scala.tools.nsc.Settings
+
+class ScalaShellITCase extends TestLogger {
+
+  import ScalaShellITCase._
+
+  /** Prevent re-creation of environment */
+  @Test
+  def testPreventRecreation(): Unit = {
+
+    val input: String =
+      """
+        val env = ExecutionEnvironment.getExecutionEnvironment
+      """.stripMargin
+
+    val output: String = processInShell(input)
+
+    Assert.assertTrue(output.contains(
+      "UnsupportedOperationException: Execution Environment is already " +
+      "defined for this shell"))
+  }
+
+  /** Iteration test with iterative Pi example */
+  @Test
+  def testIterativePI(): Unit = {
+
+    val input: String =
+      """
+        val initial = env.fromElements(0)
+        val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
+          val result = iterationInput.map { i =>
+            val x = Math.random()
+            val y = Math.random()
+            i + (if (x * x + y * y < 1) 1 else 0)
+          }
+          result
+        }
+        val result = count map { c => c / 10000.0 * 4 }
+        result.collect()
+      """.stripMargin
+
+    val output: String = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+
+  /** WordCount in Shell */
+  @Test
+  def testWordCount(): Unit = {
+    val input =
+      """
+        val text = env.fromElements("To be, or not to be,--that is the question:--",
+        "Whether 'tis nobler in the mind to suffer",
+        "The slings and arrows of outrageous fortune",
+        "Or to take arms against a sea of troubles,")
+        val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
+        val result = counts.print()
+      """.stripMargin
+
+    val output = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    // some of the words that should be included
+    Assert.assertTrue(output.contains("(a,1)"))
+    Assert.assertTrue(output.contains("(whether,1)"))
+    Assert.assertTrue(output.contains("(to,4)"))
+    Assert.assertTrue(output.contains("(arrows,1)"))
+  }
+
+  /** Sum 1..10, should be 55 */
+  @Test
+  def testSum: Unit = {
+    val input =
+      """
+        val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
+        val reduced = input.reduce(_+_)
+        reduced.print
+      """.stripMargin
+
+    val output = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    Assert.assertTrue(output.contains("55"))
+  }
+
+  /** WordCount in Shell with custom case class */
+  @Test
+  def testWordCountWithCustomCaseClass: Unit = {
+    val input =
+      """
+      case class WC(word: String, count: Int)
+      val wordCounts = env.fromElements(
+        new WC("hello", 1),
+        new WC("world", 2),
+        new WC("world", 8))
+      val reduced = wordCounts.groupBy(0).sum(1)
+      reduced.print()
+      """.stripMargin
+
+    val output = processInShell(input)
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    Assert.assertTrue(output.contains("WC(hello,1)"))
+    Assert.assertTrue(output.contains("WC(world,10)"))
+  }
+
+  /** Submit external library */
+  @Test
+  def testSubmissionOfExternalLibrary: Unit = {
+    val input =
+      """
+        import org.apache.flink.ml.math._
+        val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
+        denseVectors.print()
+      """.stripMargin
+
+    // find jar file that contains the ml code
+    var externalJar = ""
+    val folder = new File("../flink-ml/target/")
+    val listOfFiles = folder.listFiles()
+
+    for (i <- listOfFiles.indices) {
+      val filename: String = listOfFiles(i).getName
+      if (!filename.contains("test") && !filename.contains("original") &&
filename.contains(
+        ".jar")) {
+        externalJar = listOfFiles(i).getAbsolutePath
+      }
+    }
+
+    assert(externalJar != "")
+
+    val output: String = processInShell(input, Option(externalJar))
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("error"))
+    Assert.assertFalse(output.contains("Exception"))
+
+    Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
+  }
+
+
+  /**
+   * tests flink shell startup with remote cluster (starts cluster internally)
+   */
+  @Test
+  def testRemoteCluster: Unit = {
+
+    val input: String =
+      """
+        |import org.apache.flink.api.common.functions.RichMapFunction
+        |import org.apache.flink.api.java.io.PrintingOutputFormat
+        |import org.apache.flink.api.common.accumulators.IntCounter
+        |import org.apache.flink.configuration.Configuration
+        |
+        |val els = env.fromElements("foobar","barfoo")
+        |val mapped = els.map{
+        | new RichMapFunction[String, String]() {
+        |   var intCounter: IntCounter = _
+        |   override def open(conf: Configuration): Unit = {
+        |     intCounter = getRuntimeContext.getIntCounter("intCounter")
+        |   }
+        |
+        |   def map(element: String): String = {
+        |     intCounter.add(1)
+        |     element
+        |   }
+        | }
+        |}
+        |mapped.output(new PrintingOutputFormat())
+        |val executionResult = env.execute("Test Job")
+        |System.out.println("IntCounter: " + executionResult.getIntCounterResult("intCounter"))
+        |
+        |:q
+      """.stripMargin
+
+    val in: BufferedReader = new BufferedReader(
+      new StringReader(
+        input + "\n"))
+    val out: StringWriter = new StringWriter
+
+    val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+    val oldOut: PrintStream = System.out
+    System.setOut(new PrintStream(baos))
+
+    val (c, args) = cluster match{
+      case Some(cl) =>
+        val arg = Array("remote",
+          cl.hostname,
+          Integer.toString(cl.getLeaderRPCPort))
+        (cl, arg)
+      case None =>
+        throw new AssertionError("Cluster creation failed.")
+    }
+
+    //start scala shell with initialized
+    // buffered reader for testing
+    FlinkShell.bufferedReader = Some(in)
+    FlinkShell.main(args)
+    baos.flush()
+
+    val output: String = baos.toString
+    System.setOut(oldOut)
+
+    Assert.assertTrue(output.contains("IntCounter: 2"))
+    Assert.assertTrue(output.contains("foobar"))
+    Assert.assertTrue(output.contains("barfoo"))
+
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("Error"))
+    Assert.assertFalse(output.contains("ERROR"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
+}
+
+object ScalaShellITCase {
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+  val parallelism = 4
+
+  @BeforeClass
+  def beforeAll(): Unit = {
+    val cl = TestBaseUtils.startCluster(
+      1,
+      parallelism,
+      StreamingMode.BATCH_ONLY,
+      false,
+      false,
+      false)
+
+    cluster = Some(cl)
+  }
+
+  @AfterClass
+  def afterAll(): Unit = {
+    // The Scala interpreter somehow changes the class loader. Therfore, we have to reset
it
+    Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
+    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
+  }
+
+  /**
+   * Run the input using a Scala Shell and return the output of the shell.
+   * @param input commands to be processed in the shell
+   * @return output of shell
+   */
+  def processInShell(input: String, externalJars: Option[String] = None): String = {
+    val in = new BufferedReader(new StringReader(input + "\n"))
+    val out = new StringWriter()
+    val baos = new ByteArrayOutputStream()
+
+    val oldOut = System.out
+    System.setOut(new PrintStream(baos))
+
+    // new local cluster
+    val host = "localhost"
+    val port = cluster match {
+      case Some(c) => c.getLeaderRPCPort
+      case _ => throw new RuntimeException("Test cluster not initialized.")
+    }
+
+    val repl = externalJars match {
+      case Some(ej) => new FlinkILoop(
+        host, port,
+        Option(Array(ej)),
+        in, new PrintWriter(out))
+
+      case None => new FlinkILoop(
+        host, port,
+        in, new PrintWriter(out))
+    }
+
+    repl.settings = new Settings()
+
+    // enable this line to use scala in intellij
+    repl.settings.usejavacp.value = true
+
+    externalJars match {
+      case Some(ej) => repl.settings.classpath.value = ej
+      case None =>
+    }
+
+    repl.process(repl.settings)
+
+    repl.closeInterpreter()
+
+    System.setOut(oldOut)
+
+    baos.flush()
+
+    val stdout = baos.toString
+
+    out.toString + stdout
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
deleted file mode 100644
index c8b1990..0000000
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io._
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-import scala.concurrent.duration.FiniteDuration
-import scala.tools.nsc.Settings
-
-@RunWith(classOf[JUnitRunner])
-class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
-
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
-
-  test("Prevent re-creation of environment") {
-
-    val input: String =
-      """
-        val env = ExecutionEnvironment.getExecutionEnvironment
-      """.stripMargin
-
-    val output: String = processInShell(input)
-
-    output should include("UnsupportedOperationException: Execution Environment is already
" +
-      "defined for this shell")
-  }
-
-  test("Iteration test with iterative Pi example") {
-
-    val input: String =
-      """
-        val initial = env.fromElements(0)
-        val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
-          val result = iterationInput.map { i =>
-            val x = Math.random()
-            val y = Math.random()
-            i + (if (x * x + y * y < 1) 1 else 0)
-          }
-          result
-        }
-        val result = count map { c => c / 10000.0 * 4 }
-        result.collect()
-      """.stripMargin
-
-    val output: String = processInShell(input)
-
-    output should not include "failed"
-    output should not include "error"
-    output should not include "Exception"
-  }
-
-  test("WordCount in Shell") {
-    val input =
-      """
-        val text = env.fromElements("To be, or not to be,--that is the question:--",
-        "Whether 'tis nobler in the mind to suffer",
-        "The slings and arrows of outrageous fortune",
-        "Or to take arms against a sea of troubles,")
-        val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
-        val result = counts.print()
-      """.stripMargin
-
-    val output = processInShell(input)
-
-    output should not include "failed"
-    output should not include "error"
-    output should not include "Exception"
-
-    // some of the words that should be included
-    output should include("(a,1)")
-    output should include("(whether,1)")
-    output should include("(to,4)")
-    output should include("(arrows,1)")
-  }
-
-  test("Sum 1..10, should be 55") {
-    val input =
-      """
-        val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
-        val reduced = input.reduce(_+_)
-        reduced.print
-      """.stripMargin
-
-    val output = processInShell(input)
-
-    output should not include "failed"
-    output should not include "error"
-    output should not include "Exception"
-
-    output should include("55")
-  }
-
-  test("WordCount in Shell with custom case class") {
-    val input =
-      """
-      case class WC(word: String, count: Int)
-      val wordCounts = env.fromElements(
-        new WC("hello", 1),
-        new WC("world", 2),
-        new WC("world", 8))
-      val reduced = wordCounts.groupBy(0).sum(1)
-      reduced.print()
-      """.stripMargin
-
-    val output = processInShell(input)
-
-    output should not include "failed"
-    output should not include "error"
-    output should not include "Exception"
-
-    output should include("WC(hello,1)")
-    output should include("WC(world,10)")
-  }
-
-  test("Submit external library") {
-    val input =
-      """
-        import org.apache.flink.ml.math._
-        val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
-        denseVectors.print()
-      """.stripMargin
-
-    // find jar file that contains the ml code
-    var externalJar = ""
-    val folder = new File("../flink-ml/target/")
-    val listOfFiles = folder.listFiles()
-
-    for (i <- listOfFiles.indices) {
-      val filename: String = listOfFiles(i).getName
-      if (!filename.contains("test") && !filename.contains("original") &&
filename.contains(
-        ".jar")) {
-        println("ive found file:" + listOfFiles(i).getAbsolutePath)
-        externalJar = listOfFiles(i).getAbsolutePath
-      }
-    }
-
-    assert(externalJar != "")
-
-    val output: String = processInShell(input, Option(externalJar))
-
-    output should not include "failed"
-    output should not include "error"
-    output should not include "Exception"
-
-    output should include("\nDenseVector(1.0, 2.0, 3.0)")
-  }
-
-  /**
-   * Run the input using a Scala Shell and return the output of the shell.
-   * @param input commands to be processed in the shell
-   * @return output of shell
-   */
-  def processInShell(input: String, externalJars: Option[String] = None): String = {
-    val in = new BufferedReader(new StringReader(input + "\n"))
-    val out = new StringWriter()
-    val baos = new ByteArrayOutputStream()
-
-    val oldOut = System.out
-    System.setOut(new PrintStream(baos))
-
-    // new local cluster
-    val host = "localhost"
-    val port = cluster match {
-      case Some(c) => c.getLeaderRPCPort
-      case _ => throw new RuntimeException("Test cluster not initialized.")
-    }
-
-    val repl = externalJars match {
-      case Some(ej) => new FlinkILoop(
-        host, port,
-        Option(Array(ej)),
-        in, new PrintWriter(out))
-
-      case None => new FlinkILoop(
-        host, port,
-        in, new PrintWriter(out))
-    }
-
-    repl.settings = new Settings()
-
-    // enable this line to use scala in intellij
-    repl.settings.usejavacp.value = true
-
-    externalJars match {
-      case Some(ej) => repl.settings.classpath.value = ej
-      case None =>
-    }
-
-    repl.process(repl.settings)
-
-    repl.closeInterpreter()
-
-    System.setOut(oldOut)
-
-    baos.flush()
-
-    val stdout = baos.toString
-
-    out.toString + stdout
-  }
-
-  /**
-   * tests flink shell startup with remote cluster (starts cluster internally)
-   */
-  test("start flink scala shell with remote cluster") {
-
-    val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
-      "els.print\nError\n:q\n"
-
-    val in: BufferedReader = new BufferedReader(
-      new StringReader(
-        input + "\n"))
-    val out: StringWriter = new StringWriter
-
-    val baos: ByteArrayOutputStream = new ByteArrayOutputStream
-    val oldOut: PrintStream = System.out
-    System.setOut(new PrintStream(baos))
-
-    val (c, args) = cluster match{
-      case Some(cl) =>
-        val arg = Array("remote",
-          cl.hostname,
-          Integer.toString(cl.getLeaderRPCPort))
-        (cl, arg)
-      case None =>
-        fail("Cluster creation failed!")
-    }
-
-    //start scala shell with initialized
-    // buffered reader for testing
-    FlinkShell.bufferedReader = Some(in)
-    FlinkShell.main(args)
-    baos.flush()
-
-    val output: String = baos.toString
-    System.setOut(oldOut)
-
-    output should include("Job execution switched to status FINISHED.")
-    output should include("a\nb")
-
-    output should not include "Error"
-    output should not include "ERROR"
-    output should not include "Exception"
-    output should not include "failed"
-  }
-
-  override def beforeAll(): Unit = {
-    val cl = TestBaseUtils.startCluster(
-      1,
-      parallelism,
-      StreamingMode.BATCH_ONLY,
-      false,
-      false,
-      false)
-
-    cluster = Some(cl)
-  }
-
-  override def afterAll(): Unit = {
-    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index 60da09e..57bbd9b 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,20 +20,44 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.junit.runner.RunWith
-import org.scalatest.{Matchers, FunSuite}
-import org.scalatest.junit.JUnitRunner
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.junit.Assert
 
+class ScalaShellLocalStartupITCase extends TestLogger {
 
-@RunWith(classOf[JUnitRunner])
-class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
-
-    /**
-     * tests flink shell with local setup through startup script in bin folder
-     */
-    test("start flink scala shell with local cluster") {
-
-      val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n"
+  /**
+   * tests flink shell with local setup through startup script in bin folder
+   */
+  @Test
+  def testLocalCluster: Unit = {
+    val input: String =
+      """
+        |import org.apache.flink.api.common.functions.RichMapFunction
+        |import org.apache.flink.api.java.io.PrintingOutputFormat
+        |import org.apache.flink.api.common.accumulators.IntCounter
+        |import org.apache.flink.configuration.Configuration
+        |
+        |val els = env.fromElements("foobar","barfoo")
+        |val mapped = els.map{
+        | new RichMapFunction[String, String]() {
+        |   var intCounter: IntCounter = _
+        |   override def open(conf: Configuration): Unit = {
+        |     intCounter = getRuntimeContext.getIntCounter("intCounter")
+        |   }
+        |
+        |   def map(element: String): String = {
+        |     intCounter.add(1)
+        |     element
+        |   }
+        | }
+        |}
+        |mapped.output(new PrintingOutputFormat())
+        |val executionResult = env.execute("Test Job")
+        |System.out.println("IntCounter: " + executionResult.getIntCounterResult("intCounter"))
+        |
+        |:q
+      """.stripMargin
       val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
       val out: StringWriter = new StringWriter
       val baos: ByteArrayOutputStream = new ByteArrayOutputStream
@@ -41,20 +65,21 @@ class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
       System.setOut(new PrintStream(baos))
       val args: Array[String] = Array("local")
 
-      //start flink scala shell
-      FlinkShell.bufferedReader = Some(in);
-      FlinkShell.main(args)
+    //start flink scala shell
+    FlinkShell.bufferedReader = Some(in);
+    FlinkShell.main(args)
 
-      baos.flush()
-      val output: String = baos.toString
-      System.setOut(oldOut)
+    baos.flush()
+    val output: String = baos.toString
+    System.setOut(oldOut)
 
-      output should include("Job execution switched to status FINISHED.")
-      output should include("a\nb")
+    Assert.assertTrue(output.contains("IntCounter: 2"))
+    Assert.assertTrue(output.contains("foobar"))
+    Assert.assertTrue(output.contains("barfoo"))
 
-      output should not include "Error"
-      output should not include "ERROR"
-      output should not include "Exception"
-      output should not include "failed"
-    }
+    Assert.assertFalse(output.contains("failed"))
+    Assert.assertFalse(output.contains("Error"))
+    Assert.assertFalse(output.contains("ERROR"))
+    Assert.assertFalse(output.contains("Exception"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/630798d3/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index d55209e..53379b4 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -40,7 +40,6 @@ log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG
 log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
-log4j.logger.org.apache.flink.runtime.executiongraph=DEBUG
 
 # Log a bit when running the flink-yarn-tests to avoid running into the 5 minutes timeout
for
 # the tests


Mime
View raw message