flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [3/3] flink git commit: [streaming] [scala] scala SocketTextStream added and minor fixes
Date Thu, 08 Jan 2015 14:54:23 GMT
[streaming] [scala] scala SocketTextStream added and minor fixes

Organized imports for streaming scala examples
Added template parameter for scala streaming iterate
Minor fixes in streaming examples


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b22406a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b22406a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b22406a6

Branch: refs/heads/master
Commit: b22406a6e27a9528452d62602fe25c416633467b
Parents: 19066b5
Author: mbalassi <mbalassi@apache.org>
Authored: Tue Jan 6 22:50:18 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Thu Jan 8 13:35:28 2015 +0100

----------------------------------------------------------------------
 .../socket/SocketTextStreamWordCount.java       | 59 +++++++------
 .../socket/SocketTextStreamWordCount.scala      | 91 ++++++++++++++++++++
 .../examples/windowing/TopSpeedWindowing.scala  |  5 +-
 .../scala/examples/windowing/WindowJoin.scala   | 11 +--
 .../flink/streaming/api/scala/DataStream.scala  |  4 +-
 5 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
index ec32e9f..e9b60f4 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -23,11 +23,21 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
 
 /**
- * This example shows an implementation of WordCount with data from socket.
- *
+ * This example shows an implementation of WordCount with data from a text
+ * socket. To run the example make sure that the service providing the text data
+ * is already up and running.
+ * 
  * <p>
- * Usage: <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port &gt;
&lt;result path&gt;</code><br>
- *
+ * To start an example socket text stream on your local machine run netcat from
+ * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
+ * port number.
+ * 
+ * 
+ * <p>
+ * Usage:
+ * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result
path&gt;</code>
+ * <br>
+ * 
  * <p>
  * This example shows how to:
  * <ul>
@@ -35,6 +45,8 @@ import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
  * <li>write a simple Flink program,
  * <li>write and use user-defined functions.
  * </ul>
+ * 
+ * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
  */
 public class SocketTextStreamWordCount {
 	public static void main(String[] args) throws Exception {
@@ -44,16 +56,18 @@ public class SocketTextStreamWordCount {
 		}
 
 		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
 
 		// get input data
-		DataStream<String> text = env.socketTextStream(hostname, port);
+		DataStream<String> text = env.socketTextStream(hostName, port);
 
 		DataStream<Tuple2<String, Integer>> counts =
 		// split up the lines in pairs (2-tuples) containing: (word,1)
 		text.flatMap(new Tokenizer())
 		// group by the tuple field "0" and sum up tuple field "1"
-			.groupBy(0).sum(1);
+				.groupBy(0)
+				.sum(1);
 
 		if (fileOutput) {
 			counts.writeAsText(outputPath, 1);
@@ -62,7 +76,7 @@ public class SocketTextStreamWordCount {
 		}
 
 		// execute program
-		env.execute("WordCount with SocketTextStream Example");
+		env.execute("WordCount from SocketTextStream Example");
 	}
 
 	// *************************************************************************
@@ -70,30 +84,23 @@ public class SocketTextStreamWordCount {
 	// *************************************************************************
 
 	private static boolean fileOutput = false;
-	private static String hostname;
+	private static String hostName;
 	private static int port;
 	private static String outputPath;
 
 	private static boolean parseParameters(String[] args) {
 
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 3) {
-				fileOutput = true;
-				hostname = args[0];
-				port = Integer.valueOf(args[1]);
-				outputPath = args[2];
-			} else if (args.length == 2) {
-				hostname = args[0];
-				port = Integer.valueOf(args[1]);
-			} else {
-				System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output
path>");
-				return false;
-			}
+		// parse input arguments
+		if (args.length == 3) {
+			fileOutput = true;
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+			outputPath = args[2];
+		} else if (args.length == 2) {
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
 		} else {
-			System.out.println("Executing WordCount example with data from socket.");
-			System.out.println("  Provide parameters to connect data source.");
-			System.out.println("  Usage: SocketTextStreamWordCount <hostname> <port> <output
path>");
+			System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output
path>]");
 			return false;
 		}
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
new file mode 100644
index 0000000..b38764c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.socket
+
+import org.apache.flink.streaming.api.scala._
+
+/**
+ * This example shows an implementation of WordCount with data from a text socket. 
+ * To run the example make sure that the service providing the text data is already up and
running.
+ *
+ * To start an example socket text stream on your local machine run netcat from a command
line, 
+ * where the parameter specifies the port number:
+ *
+ * {{{
+ *   nc -lk 9999
+ * }}}
+ *
+ * Usage:
+ * {{{
+ *   SocketTextStreamWordCount <hostname> <port> <output path>
+ * }}}
+ *
+ * This example shows how to:
+ *
+ *   - use StreamExecutionEnvironment.socketTextStream
+ *   - write a simple Flink Streaming program in scala.
+ *   - write and use user-defined functions.
+ */
+object SocketTextStreamWordCount {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the corresponding objects
+    val text = env.socketTextStream(hostName, port)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    if (fileOutput) {
+      counts.writeAsCsv(outputPath, 1)
+    } else {
+      counts print
+    }
+
+    env.execute("Scala SocketTextStreamWordCount Example")
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+      if (args.length == 3) {
+        fileOutput = true
+        hostName = args(0)
+        port = args(1).toInt
+        outputPath = args(2)
+      } else if (args.length == 2) {
+        hostName = args(0)
+        port = args(1).toInt
+      } else {
+        System.err.println("Usage: SocketTextStreamWordCount <hostname> <port>
[<output path>]")
+        return false
+      }
+    true
+  }
+
+  private var fileOutput: Boolean = false
+  private var hostName: String = null
+  private var port: Int = 0
+  private var outputPath: String = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index a43f479..e3ef95e 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -18,12 +18,11 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
-
 import java.util.concurrent.TimeUnit._
 
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.scala.windowing.{Delta, Time}
-import org.apache.flink.api.scala._
+
 import scala.Stream._
 import scala.math._
 import scala.util.Random

http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
index d6c0363..0b78365 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala._
 
 import scala.Stream._
 import scala.util.Random
@@ -39,11 +38,13 @@ object WindowJoin {
     val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
     val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
 
-    //Join the two input streams by id on the last second every 2 seconds and create new

+    //Join the two input streams by id on the last 2 seconds every second and create new

     //Person objects containing both name and age
     val joined =
-      names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS)
-                      .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) }
+      names.join(ages).onWindow(2, TimeUnit.SECONDS)
+                      .every(1, TimeUnit.SECONDS)
+                      .where("id")
+                      .equalTo("id") { (n, a) => Person(n.name, a.age) }
 
     joined print
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b22406a6/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 6d94de7..ffe91cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -214,8 +214,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    *
    */
-  def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]),  maxWaitTimeMillis:
-    Long = 0): DataStream[T] = {
+  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),  
+        maxWaitTimeMillis:Long = 0): DataStream[R] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))


Mime
View raw message