flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/8] flink git commit: [FLINK-1403] [streaming] Distributed filesystem support for streaming filesinks
Date Wed, 21 Jan 2015 17:53:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7df6a3d72 -> 92947f0d8


[FLINK-1403] [streaming] Distributed filesystem support for streaming filesinks

Now streaming filesinks support the same filesystems as the batch ones


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc0d81bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc0d81bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc0d81bb

Branch: refs/heads/master
Commit: dc0d81bb8bc056f9c8def5ce701c1aa121d2a13f
Parents: 7df6a3d
Author: szape <nemderogatorius@gmail.com>
Authored: Tue Jan 13 14:54:01 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Jan 21 16:05:25 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 163 +++++++++++++++----
 .../api/function/sink/FileSinkFunction.java     | 118 ++++++++++++++
 .../function/sink/FileSinkFunctionByMillis.java |  59 +++++++
 .../api/function/sink/RichSinkFunction.java     |   2 +-
 .../api/function/sink/SinkFunction.java         |   2 +-
 5 files changed, 306 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7d1659f..db644e9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -28,15 +29,20 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
 import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
@@ -45,12 +51,9 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
 import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.sink.WriteFormat;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
@@ -897,20 +900,35 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Writes a DataStream to the file specified by path in text format. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            the path pointing to the location the text file is written to
+	 * 
+	 * @return the closed DataStream.
+	 */
+	public DataStreamSink<OUT> writeAsText(String path) {
+		return writeToFile(new TextOutputFormat<OUT>(new Path(path)), 0L);
+	}
+
+	/**
 	 * Writes a DataStream to the file specified by path in text format. The
 	 * writing is performed periodically, in every millis milliseconds. For
 	 * every element of the DataStream the result of {@link Object#toString()}
 	 * is written.
 	 * 
 	 * @param path
-	 *            is the path to the location where the tuples are written
+	 *            the path pointing to the location the text file is written to
 	 * @param millis
-	 *            is the file update frequency
+	 *            the file update frequency
 	 * 
-	 * @return The closed DataStream
+	 * @return the closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsText(String path, long millis) {
-		return writeAsText(path, new WriteFormatAsText<OUT>(), millis);
+		TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
+		return writeToFile(tof, millis);
 	}
 
 	/**
@@ -919,64 +937,137 @@ public class DataStream<OUT> {
 	 * is written.
 	 * 
 	 * @param path
-	 *            is the path to the location where the tuples are written
+	 *            the path pointing to the location the text file is written to
+	 * @param writeMode
+	 *            Control the behavior for existing files. Options are
+	 *            NO_OVERWRITE and OVERWRITE.
 	 * 
-	 * @return The closed DataStream
+	 * @return the closed DataStream.
 	 */
-	public DataStreamSink<OUT> writeAsText(String path) {
-		return writeAsText(path, 0);
+	public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode) {
+		TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
+		tof.setWriteMode(writeMode);
+		return writeToFile(tof, 0L);
 	}
 
 	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically, in every millis milliseconds. For
+	 * Writes a DataStream to the file specified by path in text format. For
 	 * every element of the DataStream the result of {@link Object#toString()}
 	 * is written.
 	 * 
 	 * @param path
-	 *            is the path to the location where the tuples are written
+	 *            the path pointing to the location the text file is written to
+	 * @param writeMode
+	 *            Controls the behavior for existing files. Options are
+	 *            NO_OVERWRITE and OVERWRITE.
 	 * @param millis
-	 *            is the file update frequency
+	 *            the file update frequency
 	 * 
-	 * @return The closed DataStream
+	 * @return the closed DataStream.
 	 */
-	public DataStreamSink<OUT> writeAsCsv(String path, long millis) {
-		if (!getType().isTupleType()) {
-			throw new RuntimeException("Only tuple data streams can be written in csv format");
-		}
-		return writeAsText(path, new WriteFormatAsCsv<OUT>(), millis);
+	public DataStreamSink<OUT> writeAsText(String path, WriteMode writeMode, long millis)
{
+		TextOutputFormat<OUT> tof = new TextOutputFormat<OUT>(new Path(path));
+		tof.setWriteMode(writeMode);
+		return writeToFile(tof, millis);
 	}
 
 	/**
-	 * Writes a DataStream to the file specified by path in text format. For
+	 * Writes a DataStream to the file specified by path in csv format. For
 	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
+	 * is written. This method can only be used on data streams of tuples.
 	 * 
 	 * @param path
-	 *            is the path to the location where the tuples are written
+	 *            the path pointing to the location the text file is written to
 	 * 
-	 * @return The closed DataStream
+	 * @return the closed DataStream
 	 */
-	public DataStreamSink<OUT> writeAsCsv(String path) {
-		return writeAsCsv(path, 0);
+	@SuppressWarnings("unchecked")
+	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path) {
+		Validate.isTrue(getType().isTupleType(),
+				"The writeAsCsv() method can only be used on data sets of tuples.");
+		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+		return writeToFile((OutputFormat<OUT>) of, 0L);
 	}
 
 	/**
-	 * Writes a DataStream to the file specified by path in text format. The
+	 * Writes a DataStream to the file specified by path in csv format. The
 	 * writing is performed periodically, in every millis milliseconds. For
 	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
+	 * is written. This method can only be used on data streams of tuples.
 	 * 
 	 * @param path
-	 *            is the path to the location where the tuples are written
+	 *            the path pointing to the location the text file is written to
 	 * @param millis
-	 *            is the file update frequency
+	 *            the file update frequency
 	 * 
-	 * @return the data stream constructed
+	 * @return the closed DataStream
 	 */
-	private DataStreamSink<OUT> writeAsText(String path, WriteFormat<OUT> format,
long millis) {
-		DataStreamSink<OUT> returnStream = addSink(new WriteSinkFunctionByMillis<OUT>(path,
format,
-				millis));
+	@SuppressWarnings("unchecked")
+	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, long millis)
{
+		Validate.isTrue(getType().isTupleType(),
+				"The writeAsCsv() method can only be used on data sets of tuples.");
+		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+		return writeToFile((OutputFormat<OUT>) of, millis);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in csv format. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written. This method can only be used on data streams of tuples.
+	 * 
+	 * @param path
+	 *            the path pointing to the location the text file is written to
+	 * @param writeMode
+	 *            Controls the behavior for existing files. Options are
+	 *            NO_OVERWRITE and OVERWRITE.
+	 * 
+	 * @return the closed DataStream
+	 */
+	@SuppressWarnings("unchecked")
+	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode
writeMode) {
+		Validate.isTrue(getType().isTupleType(),
+				"The writeAsCsv() method can only be used on data sets of tuples.");
+		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+		if (writeMode != null) {
+			of.setWriteMode(writeMode);
+		}
+		return writeToFile((OutputFormat<OUT>) of, 0L);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in csv format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written. This method can only be used on data streams of tuples.
+	 * 
+	 * @param path
+	 *            the path pointing to the location the text file is written to
+	 * @param writeMode
+	 *            Controls the behavior for existing files. Options are
+	 *            NO_OVERWRITE and OVERWRITE.
+	 * @param millis
+	 *            the file update frequency
+	 * 
+	 * @return the closed DataStream
+	 */
+	@SuppressWarnings("unchecked")
+	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode
writeMode,
+			long millis) {
+		Validate.isTrue(getType().isTupleType(),
+				"The writeAsCsv() method can only be used on data sets of tuples.");
+		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+		if (writeMode != null) {
+			of.setWriteMode(writeMode);
+		}
+		return writeToFile((OutputFormat<OUT>) of, millis);
+	}
+
+	private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis)
{
+		DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format,
millis));
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
new file mode 100644
index 0000000..24beba1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples in the specified
+ * OutputFormat format. Tuples are collected to a list and written to the file
+ * periodically. The target path and the overwrite mode are pre-packaged in
+ * format.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
+	protected ArrayList<IN> tupleList = new ArrayList<IN>();
+	protected volatile OutputFormat<IN> format;
+	protected volatile boolean cleanupCalled = false;
+	protected int indexInSubtaskGroup;
+	protected int currentNumberOfSubtasks;
+
+	public FileSinkFunction(OutputFormat<IN> format) {
+		this.format = format;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		format.configure(context.getTaskStubParameters());
+		indexInSubtaskGroup = context.getIndexOfThisSubtask();
+		currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
+		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
+	}
+
+	@Override
+	public void invoke(IN record) throws Exception {
+		tupleList.add(record);
+		if (updateCondition()) {
+			flush();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (!tupleList.isEmpty()) {
+			flush();
+		}
+		try {
+			format.close();
+		} catch (Exception ex) {
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			} catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
+		}
+	}
+
+	protected void flush() {
+		try {
+			for (IN rec : tupleList) {
+				format.writeRecord(rec);
+			}
+		} catch (Exception ex) {
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			} catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
+		}
+		resetParameters();
+	}
+
+	/**
+	 * Condition for writing the contents of tupleList and clearing it.
+	 * 
+	 * @return value of the updating condition
+	 */
+	protected abstract boolean updateCondition();
+
+	/**
+	 * Statements to be executed after writing a batch goes here.
+	 */
+	protected abstract void resetParameters();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
new file mode 100644
index 0000000..f049a32
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * Implementation of FileSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private final long millis;
+	private long lastTime;
+
+	public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
+		super(format);
+		this.millis = millis;
+		lastTime = System.currentTimeMillis();
+	}
+
+	/**
+	 * Condition for writing the contents of tupleList and clearing it.
+	 * 
+	 * @return value of the updating condition
+	 */
+	@Override
+	protected boolean updateCondition() {
+		return System.currentTimeMillis() - lastTime >= millis;
+	}
+
+	/**
+	 * Statements to be executed after writing a batch goes here.
+	 */
+	@Override
+	protected void resetParameters() {
+		tupleList.clear();
+		lastTime = System.currentTimeMillis();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
index b89c169..3b8a4db 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
@@ -23,6 +23,6 @@ public abstract class RichSinkFunction<IN> extends AbstractRichFunction
implemen
 
 	private static final long serialVersionUID = 1L;
 
-	public abstract void invoke(IN value);
+	public abstract void invoke(IN value) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0d81bb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 9db268e..6097603 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -23,6 +23,6 @@ import org.apache.flink.api.common.functions.Function;
 
 public interface SinkFunction<IN> extends Function, Serializable {
 
-	public abstract void invoke(IN value);
+	public abstract void invoke(IN value) throws Exception;
 
 }


Mime
View raw message