flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/2] incubator-flink git commit: [FLINK-1173] [streaming] Add socket text stream as a data source for the streaming API
Date Mon, 24 Nov 2014 15:32:03 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 42fe87494 -> 6dfb3fad9


[FLINK-1173] [streaming] Add socket text stream as a data source for the streaming API


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/80416acb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/80416acb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/80416acb

Branch: refs/heads/master
Commit: 80416acb89981b8c78e50ea3e6e374100214985e
Parents: 42fe874
Author: Chiwan Park <chiwanpark@icloud.com>
Authored: Sun Nov 16 19:33:02 2014 +0900
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Nov 24 11:33:20 2014 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  23 +++++
 .../source/SocketTextStreamFunction.java        |  69 +++++++++++++
 .../apache/flink/streaming/api/SourceTest.java  |  27 ++++-
 .../flink-streaming-examples/pom.xml            |  26 +++++
 .../wordcount/SocketTextStreamWordCount.java    | 100 +++++++++++++++++++
 5 files changed, 243 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80416acb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ce86516..82563ef 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
@@ -263,6 +264,28 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Creates a new DataStream that contains the strings received infinitely
+	 * from socket. Received strings are decoded by the system's default
+	 * character set.
+	 *
+	 * @param hostname
+	 *            The host name which a server socket bind.
+	 * @param port
+	 * 			  The port number which a server socket bind. A port number of
+	 * 			  0 means that the port number is automatically allocated.
+	 * @param delimiter
+	 * 			  A character which split received strings into records.
+	 * @return A DataStream, containing the strings received from socket.
+	 */
+	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter)
{
+		return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
+	}
+
+	public DataStreamSource<String> socketTextStream(String hostname, int port) {
+		return socketTextStream(hostname, port, '\n');
+	}
+
+	/**
 	 * Creates a new DataStream that contains a sequence of numbers.
 	 * 
 	 * @param from

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80416acb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
new file mode 100644
index 0000000..4811da5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.util.Collector;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+public class SocketTextStreamFunction implements SourceFunction<String> {
+	private static final long serialVersionUID = 1L;
+
+	private String hostname;
+	private int port;
+	private char delimiter;
+
+	public SocketTextStreamFunction(String hostname, int port, char delimiter) {
+		this.hostname = hostname;
+		this.port = port;
+		this.delimiter = delimiter;
+	}
+
+	@Override
+	public void invoke(Collector<String> collector) throws Exception {
+		Socket socket = new Socket(hostname, port);
+		while (!socket.isClosed() && socket.isConnected()) {
+			streamFromSocket(collector, socket);
+		}
+	}
+
+	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception
{
+		StringBuffer buffer = new StringBuffer();
+		BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+		while (true) {
+			int data = reader.read();
+			if (!socket.isConnected() || socket.isClosed() || data == -1) {
+				break;
+			}
+
+			if (data == delimiter) {
+				collector.collect(buffer.toString());
+				buffer = new StringBuffer();
+			} else if (data != '\r') { // ignore carriage return
+				buffer.append((char) data);
+			}
+		}
+
+		if (buffer.length() > 0) {
+			collector.collect(buffer.toString());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80416acb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
index afa8d91..156a8f2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -18,17 +18,24 @@
 package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
+import java.net.Socket;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
+import org.apache.flink.streaming.util.MockCollector;
 import org.apache.flink.streaming.util.MockSource;
 import org.junit.Test;
 
 public class SourceTest {
-	
+
 	@Test
 	public void fromElementsTest() {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
@@ -42,11 +49,27 @@ public class SourceTest {
 		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(Arrays.asList(1,
2, 3)));
 		assertEquals(expectedList, actualList);
 	}
-	
+
 	@Test
 	public void genSequenceTest() {
 		List<Long> expectedList = Arrays.asList(1L, 2L, 3L);
 		List<Long> actualList = MockSource.createAndExecute(new GenSequenceFunction(1, 3));
 		assertEquals(expectedList, actualList);
 	}
+
+	@Test
+	public void socketTextStreamTest() throws Exception {
+		List<String> expectedList = Arrays.asList("a", "b", "c");
+		List<String> actualList = new ArrayList<String>();
+
+		byte[] data = {'a', '\n', 'b', '\n', 'c', '\n'};
+
+		Socket socket = mock(Socket.class);
+		when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
+		when(socket.isClosed()).thenReturn(false);
+		when(socket.isConnected()).thenReturn(true);
+
+		new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(actualList),
socket);
+		assertEquals(expectedList, actualList);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80416acb/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 04cdf52..075a468 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -229,6 +229,32 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
+
+					<!-- SocketTextStreamWordCount -->
+					<execution>
+						<id>SocketTextStreamWordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SocketTextStreamWordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.SocketTextStreamWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount$*.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+							</includes>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 		</plugins>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80416acb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
new file mode 100644
index 0000000..85a63cc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * This example shows an implementation of WordCount with data from socket.
+ *
+ * <p>
+ * Usage: <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port &gt;
&lt;result path&gt;</code><br>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use StreamExecutionEnvironment.socketTextStream
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions.
+ * </ul>
+ */
+public class SocketTextStreamWordCount {
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = env.socketTextStream(hostname, port);
+
+		DataStream<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new WordCount.Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+			.groupBy(0).sum(1);
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath, 1);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount with SocketTextStream Example");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String hostname;
+	private static int port;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 3) {
+				fileOutput = true;
+				hostname = args[0];
+				port = Integer.valueOf(args[1]);
+				outputPath = args[2];
+			} else if (args.length == 2) {
+				hostname = args[0];
+				port = Integer.valueOf(args[1]);
+			} else {
+				System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output
path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with data from socket.");
+			System.out.println("  Provide parameters to connect data source.");
+			System.out.println("  Usage: SocketTextStreamWordCount <hostname> <port> <output
path>");
+			return false;
+		}
+		return true;
+	}
+}


Mime
View raw message