flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/4] flink git commit: [FLINK-1670] [streaming] Collect function for DataStream
Date Mon, 04 May 2015 09:38:32 GMT
[FLINK-1670] [streaming] Collect function for DataStream

Streams back the results from the job to the client via a TCP socket


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9c9450c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9c9450c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9c9450c

Branch: refs/heads/master
Commit: c9c9450c29f87f938f02e2d71ad20c59abbc6a09
Parents: 1722e83
Author: Gabor Gevay <ggab90@gmail.com>
Authored: Tue Apr 21 20:34:25 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon May 4 10:00:35 2015 +0200

----------------------------------------------------------------------
 flink-contrib/pom.xml                           |   5 +
 .../flink/contrib/streaming/CollectSink.java    | 152 +++++++++++++++++++
 .../contrib/streaming/DataStreamIterator.java   | 134 ++++++++++++++++
 .../contrib/streaming/DataStreamUtils.java      |  86 +++++++++++
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../environment/RemoteStreamEnvironment.java    |  19 +++
 flink-tests/pom.xml                             |   7 +
 .../apache/flink/test/misc/CollectITCase.java   |  60 ++++++++
 8 files changed, 464 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index 8e9aeb1..67cdc8c 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -52,6 +52,11 @@ under the License.
 			<artifactId>flink-clients</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
b/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
new file mode 100644
index 0000000..53b84ef
--- /dev/null
+++ b/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.InetAddress;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * A specialized data sink to be used by DataStreamUtils.collect.
+ */
+class CollectSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final InetAddress hostIp;
+	private final int port;
+	private final TypeSerializer<IN> serializer;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+	private StreamWriterDataOutputView streamWriter;
+
+	/**
+	 * Creates a CollectSink that will send the data to the specified host.
+	 *
+	 * @param hostIp IP address of the Socket server.
+	 * @param port Port of the Socket server.
+	 * @param serializer A serializer for the data.
+	 */
+	public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
+		this.hostIp = hostIp;
+		this.port = port;
+		this.serializer = serializer;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void initializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostIp, port);
+			outputStream = client.getOutputStream();
+			streamWriter = new StreamWriterDataOutputView(outputStream);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		try {
+			serializer.serialize(value, streamWriter);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostIp.toString() + ":" + port,
e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			dataOutputStream.flush();
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostIp.toString() + ":" + port, e);
+		} finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (IOException e) {
+					LOG.error("Cannot close connection with socket server at "
+							+ hostIp.toString() + ":" + port, e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
+	 */
+	@Override
+	public void open(Configuration parameters) {
+		initializeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+	private static class StreamWriterDataOutputView extends DataOutputStream implements DataOutputView
{
+
+		public StreamWriterDataOutputView(OutputStream stream) {
+			super(stream);
+		}
+
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			for (int i = 0; i < numBytes; i++) {
+				write(0);
+			}
+		}
+
+		public void write(DataInputView source, int numBytes) throws IOException {
+			byte[] data = new byte[numBytes];
+			source.readFully(data);
+			write(data);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
b/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
new file mode 100644
index 0000000..a98740e
--- /dev/null
+++ b/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.util.Iterator;
+import java.net.ServerSocket;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.io.DataInputStream;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+class DataStreamIterator<T> implements Iterator<T> {
+
+	ServerSocket socket;
+	InputStream tcpStream;
+	T next;
+	private final CountDownLatch connectionAccepted = new CountDownLatch(1);
+	private volatile StreamReaderDataInputView streamReader;
+	private final TypeSerializer<T> serializer;
+
+	DataStreamIterator(TypeSerializer serializer) {
+		this.serializer = serializer;
+		try {
+			socket = new ServerSocket(0, 1, null);
+		} catch (IOException e) {
+			throw new RuntimeException("DataStreamIterator: an I/O error occurred when opening the
socket", e);
+		}
+		(new AcceptThread()).start();
+	}
+
+	private class AcceptThread extends Thread {
+		public void run() {
+			try {
+				tcpStream = socket.accept().getInputStream();
+				streamReader = new StreamReaderDataInputView(tcpStream);
+				connectionAccepted.countDown();
+			} catch (IOException e) {
+				throw new RuntimeException("DataStreamIterator.AcceptThread failed", e);
+			}
+		}
+	}
+
+	/**
+	 * Returns the port on which the iterator is getting the data. (Used internally.)
+	 * @return The port
+	 */
+	public int getPort() {
+		return socket.getLocalPort();
+	}
+
+	/**
+	 * Returns true if the DataStream has more elements.
+	 * (Note: blocks if there will be more elements, but they are not available yet.)
+	 * @return true if the DataStream has more elements
+	 */
+	@Override
+	public boolean hasNext() {
+		if (next == null) {
+			readNextFromStream();
+		}
+		return next != null;
+	}
+
+	/**
+	 * Returns the next element of the DataStream. (Blocks if it is not available yet.)
+	 * @return The element
+	 * @throws NoSuchElementException if the stream has already ended
+	 */
+	@Override
+	public T next() {
+		if (next == null) {
+			readNextFromStream();
+			if (next == null) {
+				throw new NoSuchElementException();
+			}
+		}
+		T current = next;
+		next = null;
+		return current;
+	}
+
+	private void readNextFromStream(){
+		try {
+			connectionAccepted.await();
+		} catch (InterruptedException e) {
+			throw new RuntimeException("The calling thread of DataStreamIterator.readNextFromStream
was interrupted.");
+		}
+		try {
+			next = serializer.deserialize(streamReader);
+		} catch (EOFException e) {
+			next = null;
+		} catch (IOException e) {
+			throw new RuntimeException("DataStreamIterator could not read from deserializedStream",
e);
+		}
+	}
+
+	private static class StreamReaderDataInputView extends DataInputStream implements DataInputView
{
+
+		public StreamReaderDataInputView(InputStream stream) {
+			super(stream);
+		}
+
+		public void skipBytesToRead(int numBytes) throws IOException {
+			while (numBytes > 0) {
+				int skipped = skipBytes(numBytes);
+				numBytes -= skipped;
+			}
+		}
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
b/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
new file mode 100644
index 0000000..276409d
--- /dev/null
+++ b/flink-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.runtime.net.NetUtils;
+
+public class DataStreamUtils {
+
+	/**
+	 * Returns an iterator to iterate over the elements of the DataStream.
+	 * @return The iterator
+	 */
+	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+		TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
+		DataStreamIterator<OUT> it = new DataStreamIterator<OUT>(serializer);
+
+		//Find out what IP of us should be given to CollectSink, that it will be able to connect
to
+		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
+		InetAddress clientAddress;
+		if(env instanceof RemoteStreamEnvironment) {
+			String host = ((RemoteStreamEnvironment)env).getHost();
+			int port = ((RemoteStreamEnvironment)env).getPort();
+			try {
+				clientAddress = NetUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000,
400);
+			} catch (IOException e) {
+				throw new RuntimeException("IOException while trying to connect to the master", e);
+			}
+		} else {
+			try {
+				clientAddress = InetAddress.getLocalHost();
+			} catch (UnknownHostException e) {
+				throw new RuntimeException("getLocalHost failed", e);
+			}
+		}
+
+		DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress,
it.getPort(), serializer));
+		sink.setParallelism(1); // It would not work if multiple instances would connect to the
same port
+
+		(new CallExecute(stream)).start();
+
+		return it;
+	}
+
+	private static class CallExecute<OUT> extends Thread {
+
+		DataStream<OUT> stream;
+
+		public CallExecute(DataStream<OUT> stream) {
+			this.stream = stream;
+		}
+
+		@Override
+		public void run(){
+			try {
+				stream.getExecutionEnvironment().execute();
+			} catch (Exception e) {
+				throw new RuntimeException("Exception in execute()", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bd5e1ca..e93ec72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1179,6 +1179,7 @@ public class DataStream<OUT> {
 	public DataStreamSink<OUT> writeToSocket(String hostName, int port,
 			SerializationSchema<OUT, byte[]> schema) {
 		DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName,
port, schema));
+		returnStream.setParallelism(1); // It would not work if multiple instances would connect
to the same port
 		return returnStream;
 	}
 
@@ -1364,5 +1365,4 @@ public class DataStream<OUT> {
 	public DataStream<OUT> copy() {
 		return new DataStream<OUT>(this);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 50127cf..75e15d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -145,4 +145,23 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment
{
 				+ (getParallelism() == -1 ? "default" : getParallelism()) + ")";
 	}
 
+	/**
+	 * Gets the hostname of the master (JobManager), where the
+	 * program will be executed.
+	 *
+	 * @return The hostname of the master
+	 */
+	public String getHost() {
+		return host;
+	}
+
+	/**
+	 * Gets the port of the master (JobManager), where the
+	 * program will be executed.
+	 *
+	 * @return The port of the master
+	 */
+	public int getPort() {
+		return port;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 200522a..146d08c 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -141,6 +141,13 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-contrib</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+
+        <dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_${scala.binary.version}</artifactId>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9c9450c/flink-tests/src/test/java/org/apache/flink/test/misc/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CollectITCase.java
new file mode 100644
index 0000000..61f3392
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CollectITCase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.Test;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.junit.Assert;
+
+import java.util.Iterator;
+
+/**
+ * This test verifies the behavior of DataStreamUtils.collect.
+ */
+public class CollectITCase {
+	@Test
+	public void testCollect() {
+
+		Configuration config = new Configuration();
+		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getJobManagerRPCPort());
+
+		long N = 10;
+		DataStream<Long> stream = env.generateSequence(1, N);
+
+		long i = 1;
+		for(Iterator it = DataStreamUtils.collect(stream); it.hasNext(); ) {
+			Long x = (Long)it.next();
+			//System.out.println("::: " + x.toString());
+			if(x != i) {
+				Assert.fail(String.format("Should have got %d, got %d instead.", i, x));
+			}
+			i++;
+		}
+		if(i != N + 1) {
+			Assert.fail(String.format("Should have collected %d numbers, got %d instead.", N, i -
1));
+		}
+	}
+}


Mime
View raw message