flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] flink git commit: [FLINK-3551] [examples] Sync Scala streaming examples with Java examples.
Date Thu, 29 Jun 2017 21:58:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master e3bef5569 -> 55ab34ff3


http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
new file mode 100644
index 0000000..0a26a35
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData
+import org.apache.flink.streaming.scala.examples.iteration.IterateExample
+import org.apache.flink.streaming.scala.examples.join.WindowJoin
+import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
+import org.apache.flink.streaming.scala.examples.ml.IncrementalLearningSkeleton
+import org.apache.flink.streaming.scala.examples.twitter.TwitterExample
+import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount}
+import org.apache.flink.streaming.scala.examples.wordcount.WordCount
+import org.apache.flink.streaming.test.examples.join.WindowJoinData
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit.Test
+
+/**
+ * Integration test for streaming programs in Scala examples.
+ */
+class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testIterateExample(): Unit = {
+    val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
+    val resultPath = getTempDirPath("result")
+
+    // the example is inherently non-deterministic. The iteration timeout of 5000 ms
+    // is frequently not enough to make the test run stable on CI infrastructure
+    // with very small containers, so we cannot do a validation here
+    IterateExample.main(Array(
+      "--input", inputPath,
+      "--output", resultPath
+    ))
+  }
+
+  @Test
+  def testWindowJoin(): Unit = {
+    val resultPath = File.createTempFile("result-path", "dir").toURI.toString
+    try {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+      val grades = env
+        .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
+        .map( line => {
+          val fields = line.split(",")
+          Grade(fields(1), fields(2).toInt)
+        })
+
+      val salaries = env
+        .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
+        .map( line => {
+          val fields = line.split(",")
+          Salary(fields(1), fields(2).toInt)
+        })
+
+      WindowJoin.joinStreams(grades, salaries, 100)
+        .writeAsText(resultPath, WriteMode.OVERWRITE)
+
+      env.execute()
+
+      TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
+    }
+    finally try
+      FileUtils.deleteDirectory(new File(resultPath))
+
+    catch {
+      case _: Throwable =>
+    }
+  }
+
+  @Test
+  def testIncrementalLearningSkeleton(): Unit = {
+    val resultPath = getTempDirPath("result")
+    IncrementalLearningSkeleton.main(Array("--output", resultPath))
+    TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS,
resultPath)
+  }
+
+  @Test
+  def testTwitterExample(): Unit = {
+    val resultPath = getTempDirPath("result")
+    TwitterExample.main(Array("--output", resultPath))
+    TestBaseUtils.compareResultsByLinesInMemory(
+      TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
+      resultPath)
+  }
+
+  @Test
+  def testSessionWindowing(): Unit = {
+    val resultPath = getTempDirPath("result")
+    SessionWindowing.main(Array("--output", resultPath))
+    TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
+  }
+
+  @Test
+  def testWindowWordCount(): Unit = {
+    val windowSize = "250"
+    val slideSize = "150"
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
+
+    WindowWordCount.main(Array(
+      "--input", textPath,
+      "--output", resultPath,
+      "--window", windowSize,
+      "--slide", slideSize
+    ))
+
+    // since the parallel tokenizers might have different speed
+    // the exact output can not be checked just whether it is well-formed
+    // checks that the result lines look like e.g. (faust, 2)
+    TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)")
+  }
+
+  @Test
+  def testWordCount(): Unit = {
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
+
+    WordCount.main(Array(
+      "--input", textPath,
+      "--output", resultPath
+    ))
+
+    TestBaseUtils.compareResultsByLinesInMemory(
+      WordCountData.STREAMING_COUNTS_AS_TUPLES,
+      resultPath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
deleted file mode 100644
index 0e67be5..0000000
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.scala.examples.join.WindowJoin
-import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
-import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-
-class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
-  
-  @Test
-  def testProgram(): Unit = {
-    
-    val resultPath: String = File.createTempFile("result-path", "dir").toURI().toString()
-    try {
-      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-      
-      val grades: DataStream[Grade] = env
-        .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
-        .map( line => {
-          val fields = line.split(",")
-          Grade(fields(1), fields(2).toInt)
-        })
-
-      val salaries: DataStream[Salary] = env
-        .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
-        .map( line => {
-          val fields = line.split(",")
-          Salary(fields(1), fields(2).toInt)
-        })
-      
-      WindowJoin.joinStreams(grades, salaries, 100)
-        .writeAsText(resultPath, WriteMode.OVERWRITE)
-      
-      env.execute()
-
-      TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
-    }
-    finally {
-      try {
-        FileUtils.deleteDirectory(new File(resultPath))
-      }
-      catch {
-        case _ : Throwable => 
-      }
-    }
-  }
-
-}


Mime
View raw message