flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/2] flink git commit: [FLINK-1582][streaming]Allow SocketStream to reconnect when socket closes.
Date Wed, 25 Feb 2015 12:22:17 GMT
[FLINK-1582][streaming]Allow SocketStream to reconnect when socket closes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd65a241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd65a241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd65a241

Branch: refs/heads/master
Commit: fd65a2411ddf8f80d69e8de425bd8805de300195
Parents: 2fe3deb
Author: mingliang <qmlmoon@gmail.com>
Authored: Thu Feb 19 16:53:32 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Feb 25 13:18:19 2015 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 29 ++++++++++++--
 .../source/SocketTextStreamFunction.java        | 42 +++++++++++++++++---
 2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd65a241/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4a3e5a2..8487fa5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -375,6 +375,29 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a new DataStream that contains the strings received infinitely
 	 * from socket. Received strings are decoded by the system's default
 	 * character set.
+	 *
+	 * @param hostname
+	 *            The host name which a server socket bind.
+	 * @param port
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
+	 * @param delimiter
+	 *            A character which split received strings into records.
+	 * @param maxRetry
+	 *            The maximal retry number when the socket is down. Reconnection is
+	 *            tried in every 5 seconds. A number of 0 means that the reader
+	 *            is immediately terminated.
+	 * @return A DataStream, containing the strings received from socket.
+	 */
+	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
int maxRetry) {
+		return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null,
+			"Socket Stream");
+	}
+
+	/**
+	 * Creates a new DataStream that contains the strings received infinitely
+	 * from socket. Received strings are decoded by the system's default
+	 * character set. The reader is terminated immediately when socket is down.
 	 * 
 	 * @param hostname
 	 *            The host name which a server socket bind.
@@ -386,14 +409,14 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A DataStream, containing the strings received from socket.
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter)
{
-		return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null,
-				"Socket Stream");
+		return socketTextStream(hostname, port, delimiter, 0);
 	}
 
 	/**
 	 * Creates a new DataStream that contains the strings received infinitely
 	 * from socket. Received strings are decoded by the system's default
-	 * character set, uses '\n' as delimiter.
+	 * character set, uses '\n' as delimiter. The reader is terminated immediately
+	 * when socket is down.
 	 * 
 	 * @param hostname
 	 *            The host name which a server socket bind.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd65a241/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index ac82b10..d995522 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -19,25 +19,37 @@ package org.apache.flink.streaming.api.function.source;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SocketTextStreamFunction extends RichSourceFunction<String> {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
+
 	private static final long serialVersionUID = 1L;
 	
 	private String hostname;
 	private int port;
 	private char delimiter;
+	private int maxRetry;
 	private Socket socket;
 	private static final int CONNECTION_TIMEOUT_TIME = 0;
 
 	public SocketTextStreamFunction(String hostname, int port, char delimiter) {
+		this(hostname, port, delimiter, 0);
+	}
+
+	public SocketTextStreamFunction(String hostname, int port, char delimiter, int maxRetry)
{
 		this.hostname = hostname;
 		this.port = port;
 		this.delimiter = delimiter;
+		this.maxRetry = maxRetry;
 	}
 
 	@Override
@@ -50,9 +62,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String>
{
 	
 	@Override
 	public void invoke(Collector<String> collector) throws Exception {
-		while (!socket.isClosed() && socket.isConnected()) {
-			streamFromSocket(collector, socket);
-		}
+		streamFromSocket(collector, socket);
 	}
 
 	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception
{
@@ -61,8 +71,30 @@ public class SocketTextStreamFunction extends RichSourceFunction<String>
{
 
 		while (true) {
 			int data = reader.read();
-			if (!socket.isConnected() || socket.isClosed() || data == -1) {
-				break;
+			if (data == -1) {
+				socket.close();
+				int retry = 0;
+				boolean success = false;
+				while (retry < maxRetry && !success) {
+					retry++;
+					LOG.warn("Lost connection to server socket. Retrying in 5 seconds...");
+					try {
+						socket = new Socket();
+						socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
+						success = true;
+					} catch (ConnectException ce) {
+						Thread.sleep(5000);
+					}
+				}
+
+				if (success) {
+					LOG.info("Server socket is reconnected.");
+				} else {
+					LOG.error("Could not reconnect to server socket.");
+					break;
+				}
+				reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+				continue;
 			}
 
 			if (data == delimiter) {


Mime
View raw message