flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent
Date Tue, 14 Jun 2016 16:12:36 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ae4758f..1cd052c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -51,14 +51,18 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
@@ -875,24 +879,34 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be
-	 * read with the system's default character set.
+	 * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such
+	 * line. The file will be read with the system's default character set.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @return The data stream that represents the data read from the given file as text lines
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
-		Preconditions.checkNotNull(filePath, "The file path may not be null.");
-		TextInputFormat format = new TextInputFormat(new Path(filePath));
-		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
-
-		return createInput(format, typeInfo, "Read Text File Source");
+		return readTextFile(filePath, "UTF-8");
 	}
 
 	/**
-	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link
-	 * java.nio.charset.Charset} with the given name will be used to read the files.
+	 * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such
+	 * line. The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
@@ -902,15 +916,32 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 		format.setCharsetName(charsetName);
 
-		return createInput(format, typeInfo, "Read Text File Source");
+		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1,
+			FilePathFilter.createDefaultFilter(), typeInfo);
 	}
 
 	/**
-	 * Reads the given file with the given imput format.
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
+	 *
+	 * <p>
+	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * In the latter case, this method will invoke the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+	 * type produced by the input format.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
@@ -920,19 +951,64 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data read from the given file
 	 */
-	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
-		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
-		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath) {
+		return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter());
+	}
 
-		inputFormat.setFilePath(new Path(filePath));
+	/**
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
+	 * on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path
+	 * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and
+	 * exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed, the user
+	 * can specify a custom {@link FilePathFilter}. As a default implementation you can use
+	 * {@link FilePathFilter#createDefaultFilter()}.
+	 *
+	 * <p>
+	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * In the latter case, this method will invoke the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+	 * type produced by the input format.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
+	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param watchType
+	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+	 * @param interval
+	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+	 * @param filter
+	 * 		The files to be excluded from the processing
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data read from the given file
+	 */
+	@PublicEvolving
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath,
+												FileProcessingMode watchType,
+												long interval,
+												FilePathFilter filter) {
+
+		TypeInformation<OUT> typeInformation;
 		try {
-			return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Read File source");
+			typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
 		} catch (Exception e) {
-			throw new InvalidProgramException("The type returned by the input format could not be automatically " +
-					"determined. " +
-					"Please specify the TypeInformation of the produced type explicitly by using the " +
-					"'createInput(InputFormat, TypeInformation)' method instead.");
+			throw new InvalidProgramException("The type returned by the input format could not be " +
+				"automatically determined. Please specify the TypeInformation of the produced type " +
+				"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
 		}
+		return readFile(inputFormat, filePath, watchType, interval, filter, typeInformation);
 	}
 
 	/**
@@ -952,15 +1028,62 @@ public abstract class StreamExecutionEnvironment {
 	 * 		of files.
 	 * @return The DataStream containing the given directory.
 	 */
+	@Deprecated
 	public DataStream<String> readFileStream(String filePath, long intervalMillis,
-											WatchType watchType) {
+											FileMonitoringFunction.WatchType watchType) {
 		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-				filePath, intervalMillis, watchType), "Read File Stream source");
+			filePath, intervalMillis, watchType), "Read File Stream source");
 
 		return source.flatMap(new FileReadFunction());
 	}
 
 	/**
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
+	 * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms)
+	 * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the
+	 * path and exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed,
+	 * the user can specify a custom {@link FilePathFilter}. As a default implementation you can use
+	 * {@link FilePathFilter#createDefaultFilter()}.
+	 *
+	 * <p>
+	 *  <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
+	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param watchType
+	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+	 * @param filter
+	 * 		The files to be excluded from the processing
+	 * @param typeInformation
+	 * 		Information on the type of the elements in the output stream
+	 * @param interval
+	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data read from the given file
+	 */
+	@PublicEvolving
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath,
+												FileProcessingMode watchType,
+												long interval,
+												FilePathFilter filter,
+												TypeInformation<OUT> typeInformation) {
+
+		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
+		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+
+		inputFormat.setFilePath(filePath);
+		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter, interval);
+	}
+
+	/**
 	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
 	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
 	 * initiated.
@@ -1026,12 +1149,20 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
 	 * <p>
-	 * Since all data streams need specific information about their types, this method needs to determine the type of
-	 * the data produced by the input format. It will attempt to determine the data type by reflection, unless the
-	 * input
-	 * format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. In the latter
-	 * case, this method will invoke the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()}
-	 * method to determine data type produced by the input format.
+	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * In the latter case, this method will invoke the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+	 * type produced by the input format.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
+	 * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+	 * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
+	 * without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints.
 	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
@@ -1041,35 +1172,84 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
-		return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
+		return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
 	}
 
 	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
 	 * <p>
-	 * The data stream is typed to the given TypeInformation. This method is intended for input formats where the
-	 * return
-	 * type cannot be determined by reflection analysis, and that do not implement the
+	 * The data stream is typed to the given TypeInformation. This method is intended for input formats
+	 * where the return type cannot be determined by reflection analysis, and that do not implement the
 	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
+	 * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+	 * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
+	 * without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints.
+	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
+	 * @param typeInfo
+	 * 		The information about the type of the output type
 	 * @param <OUT>
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data created by the input format
 	 */
 	@PublicEvolving
 	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
-		return createInput(inputFormat, typeInfo, "Custom File source");
+		DataStreamSource<OUT> source;
+
+		if (inputFormat instanceof FileInputFormat) {
+			FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
+			source = createFileInput(format, typeInfo, "Custom File source",
+				FileProcessingMode.PROCESS_ONCE,
+				FilePathFilter.createDefaultFilter(),  -1);
+		} else {
+			source = createInput(inputFormat, typeInfo, "Custom Source");
+		}
+		return source;
 	}
 
-	// private helper for passing different names
 	private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
-			TypeInformation<OUT> typeInfo, String sourceName) {
-		FileSourceFunction<OUT> function = new FileSourceFunction<>(inputFormat, typeInfo);
+													TypeInformation<OUT> typeInfo,
+													String sourceName) {
+
+		InputFormatSource<OUT> function = new InputFormatSource<>(inputFormat, typeInfo);
 		return addSource(function, sourceName, typeInfo);
 	}
 
+	private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,
+														TypeInformation<OUT> typeInfo,
+														String sourceName,
+														FileProcessingMode watchType,
+														FilePathFilter pathFilter,
+														long interval) {
+
+		Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
+		Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
+		Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
+		Preconditions.checkNotNull(watchType, "Unspecified watchtype.");
+		Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function.");
+
+		Preconditions.checkArgument(watchType.equals(FileProcessingMode.PROCESS_ONCE) ||
+			interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+			"The path monitoring interval cannot be less than 100 ms.");
+
+		ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
+			inputFormat, inputFormat.getFilePath().toString(),
+			pathFilter, watchType, getParallelism(), interval);
+
+		ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat);
+
+		SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
+			.transform("FileSplitReader_" + sourceName, typeInfo, reader);
+
+		return new DataStreamSource<>(source);
+	}
+
 	/**
 	 * Adds a Data Source to the streaming topology.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
new file mode 100644
index 0000000..b97c274
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which splits will be further processed
+ * depends on the user-provided {@link FileProcessingMode} and the {@link FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction<OUT>
+	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+	/**
+	 * The minimum interval allowed between consecutive path scans. This is applicable if the
+	 * {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+	 */
+	public static final long MIN_MONITORING_INTERVAL = 100l;
+
+	/** The path to monitor. */
+	private final String path;
+
+	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
+	private final int readerParallelism;
+
+	/** The {@link FileInputFormat} to be read. */
+	private FileInputFormat<OUT> format;
+
+	/** How often to monitor the state of the directory for new data. */
+	private final long interval;
+
+	/** Which new data to process (see {@link FileProcessingMode}. */
+	private final FileProcessingMode watchType;
+
+	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
+
+	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
+
+	private long globalModificationTime;
+
+	private FilePathFilter pathFilter;
+
+	private volatile boolean isRunning = true;
+
+	public ContinuousFileMonitoringFunction(
+		FileInputFormat<OUT> format, String path,
+		FilePathFilter filter, FileProcessingMode watchType,
+		int readerParallelism, long interval) {
+
+		if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) {
+			throw new IllegalArgumentException("The specified monitoring interval (" + interval + " ms) is " +
+				"smaller than the minimum allowed one (100 ms).");
+		}
+		this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
+		this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
+		this.pathFilter = Preconditions.checkNotNull(filter, "Unspecified File Path Filter.");
+
+		this.interval = interval;
+		this.watchType = watchType;
+		this.readerParallelism = Math.max(readerParallelism, 1);
+		this.globalModificationTime = Long.MIN_VALUE;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open(Configuration parameters) throws Exception {
+		LOG.info("Opening File Monitoring Source.");
+		
+		super.open(parameters);
+		format.configure(parameters);
+	}
+
+	@Override
+	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
+		FileSystem fileSystem = FileSystem.get(new URI(path));
+
+		switch (watchType) {
+			case PROCESS_CONTINUOUSLY:
+				while (isRunning) {
+					monitorDirAndForwardSplits(fileSystem, context);
+					Thread.sleep(interval);
+				}
+				isRunning = false;
+				break;
+			case PROCESS_ONCE:
+				monitorDirAndForwardSplits(fileSystem, context);
+				isRunning = false;
+				break;
+			default:
+				isRunning = false;
+				throw new RuntimeException("Unknown WatchType" + watchType);
+		}
+	}
+
+	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
+		final Object lock = context.getCheckpointLock();
+
+		// it may be non-null in the case of a recovery after a failure.
+		if (currentSplitsToFwd != null) {
+			synchronized (lock) {
+				forwardSplits(currentSplitsToFwd, context);
+			}
+		}
+		currentSplitsToFwd = null;
+
+		// it may be non-null in the case of a recovery after a failure.
+		if (splitsToFwdOrderedAscByModTime == null) {
+			splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs);
+		}
+
+		Iterator<Tuple2<Long, List<FileInputSplit>>> it =
+			splitsToFwdOrderedAscByModTime.iterator();
+
+		while (it.hasNext()) {
+			synchronized (lock) {
+				currentSplitsToFwd = it.next();
+				it.remove();
+				forwardSplits(currentSplitsToFwd, context);
+			}
+		}
+
+		// set them to null to distinguish from a restore.
+		splitsToFwdOrderedAscByModTime = null;
+		currentSplitsToFwd = null;
+	}
+
+	private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
+		currentSplitsToFwd = splitsToFwd;
+		Long modTime = currentSplitsToFwd.f0;
+		List<FileInputSplit> splits = currentSplitsToFwd.f1;
+
+		Iterator<FileInputSplit> it = splits.iterator();
+		while (it.hasNext()) {
+			FileInputSplit split = it.next();
+			processSplit(split, context);
+			it.remove();
+		}
+
+		// update the global modification time
+		if (modTime >= globalModificationTime) {
+			globalModificationTime = modTime;
+		}
+	}
+
+	private void processSplit(FileInputSplit split, SourceContext<FileInputSplit> context) {
+		LOG.info("Forwarding split: " + split);
+		context.collect(split);
+	}
+
+	private List<Tuple2<Long, List<FileInputSplit>>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
+		List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
+		if (eligibleFiles.isEmpty()) {
+			return new ArrayList<>();
+		}
+
+		Map<Long, List<FileInputSplit>> splitsToForward = getInputSplits(eligibleFiles);
+		List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward = new ArrayList<>();
+
+		for (Map.Entry<Long, List<FileInputSplit>> entry : splitsToForward.entrySet()) {
+			sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+		}
+
+		Collections.sort(sortedSplitsToForward, new Comparator<Tuple2<Long, List<FileInputSplit>>>() {
+			@Override
+			public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<FileInputSplit>> o2) {
+				return (int) (o1.f0 - o2.f0);
+			}
+		});
+
+		return sortedSplitsToForward;
+	}
+
+	/**
+	 * Creates the input splits for the path to be forwarded to the downstream tasks of the
+	 * {@link ContinuousFileReaderOperator}. Those tasks are going to read their contents for further
+	 * processing. Splits belonging to files in the {@code eligibleFiles} list are the ones
+	 * that are shipped for further processing.
+	 * @param eligibleFiles The files to process.
+	 */
+	private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> eligibleFiles) throws IOException {
+		if (eligibleFiles.isEmpty()) {
+			return new HashMap<>();
+		}
+
+		FileInputSplit[] inputSplits = format.createInputSplits(readerParallelism);
+
+		Map<Long, List<FileInputSplit>> splitsPerFile = new HashMap<>();
+		for (FileInputSplit split: inputSplits) {
+			for (FileStatus file: eligibleFiles) {
+				if (file.getPath().equals(split.getPath())) {
+					Long modTime = file.getModificationTime();
+
+					List<FileInputSplit> splitsToForward = splitsPerFile.get(modTime);
+					if (splitsToForward == null) {
+						splitsToForward = new LinkedList<>();
+						splitsPerFile.put(modTime, splitsToForward);
+					}
+					splitsToForward.add(split);
+					break;
+				}
+			}
+		}
+		return splitsPerFile;
+	}
+
+	/**
+	 * Returns the files that have data to be processed. This method returns the
+	 * Paths to the aforementioned files. It is up to the {@link #processSplit(FileInputSplit, SourceContext)}
+	 * method to decide which parts of the file to be processed, and forward them downstream.
+	 */
+	private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
+		List<FileStatus> files = new ArrayList<>();
+
+		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+		if (statuses == null) {
+			LOG.warn("Path does not exist: {}", path);
+		} else {
+			// handle the new files
+			for (FileStatus status : statuses) {
+				Path filePath = status.getPath();
+				long modificationTime = status.getModificationTime();
+				if (!shouldIgnore(filePath, modificationTime)) {
+					files.add(status);
+				}
+			}
+		}
+		return files;
+	}
+
+	/**
+	 * Returns {@code true} if the file is NOT to be processed further.
+	 * This happens in the following cases:
+	 *
+	 * If the user-specified path filtering method returns {@code true} for the file,
+	 * or if the modification time of the file is smaller than the {@link #globalModificationTime}, which
+	 * is the time of the most recent modification found in any of the already processed files.
+	 */
+	private boolean shouldIgnore(Path filePath, long modificationTime) {
+		boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
+		if (shouldIgnore) {
+			LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
+		}
+		return  shouldIgnore;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		isRunning = false;
+		LOG.info("Closed File Monitoring Source.");
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+	//	---------------------			Checkpointing			--------------------------
+
+	@Override
+	public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long> snapshotState(
+		long checkpointId, long checkpointTimestamp) throws Exception {
+
+		if (!isRunning) {
+			LOG.debug("snapshotState() called on closed source");
+			return null;
+		}
+		return new Tuple3<>(splitsToFwdOrderedAscByModTime,
+			currentSplitsToFwd, globalModificationTime);
+	}
+
+	@Override
+	public void restoreState(Tuple3<List<Tuple2<Long, List<FileInputSplit>>>,
+		Tuple2<Long, List<FileInputSplit>>, Long> state) throws Exception {
+
+		this.splitsToFwdOrderedAscByModTime = state.f0;
+		this.currentSplitsToFwd = state.f1;
+		this.globalModificationTime = state.f2;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
new file mode 100644
index 0000000..4d4a792
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors
+ * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another
+ * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from
+ * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()}
+ * so that the checkpoints reflect the current state.
+ */
+@Internal
+public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
+	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
+
+	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
+
+	private transient SplitReader<S, OUT> reader;
+	private transient TimestampedCollector<OUT> collector;
+
+	private FileInputFormat<OUT> format;
+	private TypeSerializer<OUT> serializer;
+
+	private Object checkpointLock;
+
+	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
+
+	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
+		this.format = checkNotNull(format);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		this.serializer = outTypeInfo.createSerializer(executionConfig);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		if (this.serializer == null) {
+			throw new IllegalStateException("The serializer has not been set. " +
+				"Probably the setOutputType() was not called and this should not have happened. " +
+				"Please report it.");
+		}
+
+		this.format.configure(new Configuration());
+		this.collector = new TimestampedCollector<>(output);
+		this.checkpointLock = getContainingTask().getCheckpointLock();
+
+		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
+		this.reader.start();
+	}
+
+	@Override
+	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
+		reader.addSplit(element.getValue());
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public void dispose() {
+		super.dispose();
+
+		// first try to cancel it properly and
+		// give it some time until it finishes
+		reader.cancel();
+		try {
+			reader.join(200);
+		} catch (InterruptedException e) {
+			// we can ignore this
+		}
+
+		// if the above did not work, then interrupt the thread repeatedly
+		while (reader.isAlive()) {
+
+			StringBuilder bld = new StringBuilder();
+			StackTraceElement[] stack = reader.getStackTrace();
+			for (StackTraceElement e : stack) {
+				bld.append(e).append('\n');
+			}
+			LOG.warn("The reader is stuck in method:\n {}", bld.toString());
+
+			reader.interrupt();
+			try {
+				reader.join(50);
+			} catch (InterruptedException e) {
+				// we can ignore this
+			}
+		}
+		reader = null;
+		collector = null;
+		format = null;
+		serializer = null;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+
+		// signal that no more splits will come, wait for the reader to finish
+		// and close the collector. Further cleaning up is handled by the dispose().
+
+		if (reader != null && reader.isAlive() && reader.isRunning()) {
+			// add a dummy element to signal that no more splits will
+			// arrive and wait until the reader finishes
+			reader.addSplit(EOS);
+
+			// we already have the checkpoint lock because close() is
+			// called by the StreamTask while having it.
+			checkpointLock.wait();
+		}
+		collector.close();
+	}
+
+	private class SplitReader<S extends Serializable, OT> extends Thread {
+
+		private volatile boolean isRunning;
+
+		private final FileInputFormat<OT> format;
+		private final TypeSerializer<OT> serializer;
+
+		private final Object checkpointLock;
+		private final TimestampedCollector<OT> collector;
+
+		private final Queue<FileInputSplit> pendingSplits;
+
+		private FileInputSplit currentSplit = null;
+
+		private S restoredFormatState = null;
+
+		SplitReader(FileInputFormat<OT> format,
+					TypeSerializer<OT> serializer,
+					TimestampedCollector<OT> collector,
+					Object checkpointLock,
+					Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
+
+			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
+			this.serializer = checkNotNull(serializer, "Unspecified Serialized.");
+
+			this.pendingSplits = new LinkedList<>();
+			this.collector = collector;
+			this.checkpointLock = checkpointLock;
+			this.isRunning = true;
+
+			// this is the case where a task recovers from a previous failed attempt
+			if (restoredState != null) {
+				List<FileInputSplit> pending = restoredState.f0;
+				FileInputSplit current = restoredState.f1;
+				S formatState = restoredState.f2;
+
+				for (FileInputSplit split : pending) {
+					pendingSplits.add(split);
+				}
+
+				this.currentSplit = current;
+				this.restoredFormatState = formatState;
+			}
+			ContinuousFileReaderOperator.this.readerState = null;
+		}
+
+		void addSplit(FileInputSplit split) {
+			Preconditions.checkNotNull(split);
+			synchronized (checkpointLock) {
+				this.pendingSplits.add(split);
+			}
+		}
+
+		public boolean isRunning() {
+			return this.isRunning;
+		}
+
+		@Override
+		public void run() {
+			try {
+				while (this.isRunning) {
+
+					synchronized (checkpointLock) {
+						if (this.currentSplit != null) {
+
+							if (currentSplit.equals(EOS)) {
+								isRunning = false;
+								break;
+							}
+
+							if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
+								((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+							} else {
+								// this is the case of a non-checkpointable input format that will reprocess the last split.
+								LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
+								format.open(currentSplit);
+							}
+							// reset the restored state to null for the next iteration
+							this.restoredFormatState = null;
+						} else {
+
+							// get the next split to read.
+							currentSplit = this.pendingSplits.poll();
+
+							if (currentSplit == null) {
+								checkpointLock.wait(50);
+								continue;
+							}
+
+							if (currentSplit.equals(EOS)) {
+								isRunning = false;
+								break;
+							}
+							this.format.open(currentSplit);
+						}
+					}
+
+					LOG.info("Reading split: " + currentSplit);
+
+					try {
+						OT nextElement = serializer.createInstance();
+						while (!format.reachedEnd()) {
+							synchronized (checkpointLock) {
+								nextElement = format.nextRecord(nextElement);
+								if (nextElement != null) {
+									collector.collect(nextElement);
+								} else {
+									break;
+								}
+							}
+						}
+
+					} finally {
+						// close and prepare for the next iteration
+						this.format.close();
+						this.currentSplit = null;
+					}
+				}
+
+			} catch (Throwable e) {
+				if (isRunning) {
+					LOG.error("Caught exception processing split: ", currentSplit);
+				}
+				getContainingTask().failExternally(e);
+			} finally {
+				synchronized (checkpointLock) {
+					LOG.info("Reader terminated, and exiting...");
+					checkpointLock.notifyAll();
+				}
+			}
+		}
+
+		Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
+			List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
+			for (FileInputSplit split: this.pendingSplits) {
+				snapshot.add(split);
+			}
+
+			// remove the current split from the list if inside.
+			if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) {
+				this.pendingSplits.remove();
+			}
+
+			if (this.format instanceof CheckpointableInputFormat) {
+				S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
+				return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
+			} else {
+				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+				return new Tuple3<>(snapshot, currentSplit, null);
+			}
+		}
+
+		public void cancel() {
+			this.isRunning = false;
+		}
+	}
+
+	//	---------------------			Checkpointing			--------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		final AbstractStateBackend.CheckpointStateOutputStream os =
+			this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
+
+		final ObjectOutputStream oos = new ObjectOutputStream(os);
+		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+
+		Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
+		List<FileInputSplit> pendingSplits = readerState.f0;
+		FileInputSplit currSplit = readerState.f1;
+		S formatState = readerState.f2;
+
+		// write the current split
+		oos.writeObject(currSplit);
+
+		// write the pending ones
+		ov.writeInt(pendingSplits.size());
+		for (FileInputSplit split : pendingSplits) {
+			oos.writeObject(split);
+		}
+
+		// write the state of the reading channel
+		oos.writeObject(formatState);
+		taskState.setOperatorState(os.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		StreamStateHandle stream = (StreamStateHandle) state.getOperatorState();
+
+		final InputStream is = stream.getState(getUserCodeClassloader());
+		final ObjectInputStream ois = new ObjectInputStream(is);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
+
+		// read the split that was being read
+		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+
+		// read the pending splits list
+		List<FileInputSplit> pendingSplits = new LinkedList<>();
+		int noOfSplits = div.readInt();
+		for (int i = 0; i < noOfSplits; i++) {
+			FileInputSplit split = (FileInputSplit) ois.readObject();
+			pendingSplits.add(split);
+		}
+
+		// read the state of the format
+		S formatState = (S) ois.readObject();
+
+		// set the whole reader state for the open() to find.
+		this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
+		div.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index fc24079..06da8c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -32,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@PublicEvolving
+@Deprecated
 public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
new file mode 100644
index 0000000..1a359ab
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * An interface to be implemented by the user when using the {@link ContinuousFileMonitoringFunction}.
+ * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files that
+ * are still being written.
+ */
+@PublicEvolving
+public abstract class FilePathFilter implements Serializable {
+
+	public static FilePathFilter createDefaultFilter() {
+		return new DefaultFilter();
+	}
+	/**
+	 * Returns {@code true} if the {@code filePath} given is to be
+	 * ignored when processing a directory, e.g.
+	 * <pre>
+	 * {@code
+	 *
+	 * public boolean filterPaths(Path filePath) {
+	 *     return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
+	 * }
+	 * }</pre>
+	 */
+	public abstract boolean filterPath(Path filePath);
+
+	/**
+	 * The default file path filtering method and is used
+	 * if no other such function is provided. This filter leaves out
+	 * files starting with ".", "_", and "_COPYING_".
+	 */
+	public static class DefaultFilter extends FilePathFilter {
+
+		DefaultFilter() {}
+
+		@Override
+		public boolean filterPath(Path filePath) {
+			return filePath == null ||
+				filePath.getName().startsWith(".") ||
+				filePath.getName().startsWith("_") ||
+				filePath.getName().contains("_COPYING_");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
new file mode 100644
index 0000000..cdbeb2b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Specifies when the computation of the {@link ContinuousFileMonitoringFunction}
+ * will be triggered.
+ */
+@PublicEvolving
+public enum FileProcessingMode {
+
+	PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
+	PROCESS_CONTINUOUSLY		// Reprocesses the whole file when new data is appended.
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
index 0f78826..ac1e834 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -21,7 +21,6 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.URI;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -29,7 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 
-@PublicEvolving
+@Deprecated
 public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
deleted file mode 100644
index 0dcb9ff..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-@PublicEvolving
-public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private TypeInformation<OUT> typeInfo;
-	private transient TypeSerializer<OUT> serializer;
-
-	private InputFormat<OUT, InputSplit> format;
-
-	private transient InputSplitProvider provider;
-	private transient Iterator<InputSplit> splitIterator;
-
-	private volatile boolean isRunning = true;
-
-	@SuppressWarnings("unchecked")
-	public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
-		this.format = (InputFormat<OUT, InputSplit>) format;
-		this.typeInfo = typeInfo;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		this.provider = context.getInputSplitProvider();
-		
-		format.configure(parameters);
-		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
-		splitIterator = getInputSplits();
-		if (splitIterator.hasNext()) {
-			format.open(splitIterator.next());
-		}
-		isRunning = true;
-	}
-
-	@Override
-	public void close() throws Exception {
-		format.close();
-	}
-
-	private Iterator<InputSplit> getInputSplits() {
-
-		return new Iterator<InputSplit>() {
-
-			private InputSplit nextSplit;
-
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-
-				if (nextSplit != null) {
-					return true;
-				}
-
-				InputSplit split = provider.getNextInputSplit();
-
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				} else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public InputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final InputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-
-	@Override
-	public void run(SourceContext<OUT> ctx) throws Exception {
-		while (isRunning) {
-			OUT nextElement = serializer.createInstance();
-			nextElement =  format.nextRecord(nextElement);
-			if (nextElement == null && splitIterator.hasNext()) {
-				format.open(splitIterator.next());
-				continue;
-			} else if (nextElement == null) {
-				break;
-			}
-			ctx.collect(nextElement);
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-
-	/**
-	 * Returns the {@code InputFormat}. This is only needed because we need to set the input
-	 * split assigner on the {@code StreamGraph}.
-	 */
-	public InputFormat<OUT, InputSplit> getFormat() {
-		return format;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
new file mode 100644
index 0000000..2a84781
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+@Internal
+public class InputFormatSource<OUT> extends RichParallelSourceFunction<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private TypeInformation<OUT> typeInfo;
+	private transient TypeSerializer<OUT> serializer;
+
+	private InputFormat<OUT, InputSplit> format;
+
+	private transient InputSplitProvider provider;
+	private transient Iterator<InputSplit> splitIterator;
+
+	private volatile boolean isRunning = true;
+
+	@SuppressWarnings("unchecked")
+	public InputFormatSource(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
+		this.format = (InputFormat<OUT, InputSplit>) format;
+		this.typeInfo = typeInfo;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open(Configuration parameters) throws Exception {
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		this.provider = context.getInputSplitProvider();
+		
+		format.configure(parameters);
+		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+		splitIterator = getInputSplits();
+		if (splitIterator.hasNext()) {
+			format.open(splitIterator.next());
+		}
+		isRunning = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		format.close();
+	}
+
+	private Iterator<InputSplit> getInputSplits() {
+
+		return new Iterator<InputSplit>() {
+
+			private InputSplit nextSplit;
+
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+
+				if (nextSplit != null) {
+					return true;
+				}
+
+				InputSplit split = provider.getNextInputSplit();
+
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				} else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public InputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final InputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+
+	@Override
+	public void run(SourceContext<OUT> ctx) throws Exception {
+		while (isRunning) {
+			OUT nextElement = serializer.createInstance();
+			nextElement =  format.nextRecord(nextElement);
+			if (nextElement == null && splitIterator.hasNext()) {
+				format.open(splitIterator.next());
+				continue;
+			} else if (nextElement == null) {
+				break;
+			}
+			ctx.collect(nextElement);
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+
+	/**
+	 * Returns the {@code InputFormat}. This is only needed because we need to set the input
+	 * split assigner on the {@code StreamGraph}.
+	 */
+	public InputFormat<OUT, InputSplit> getFormat() {
+		return format;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 70c5cff..685655e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -425,8 +425,8 @@ public class StreamGraphGenerator {
 				null,
 				source.getOutputType(),
 				"Source: " + source.getName());
-		if (source.getOperator().getUserFunction() instanceof FileSourceFunction) {
-			FileSourceFunction<T> fs = (FileSourceFunction<T>) source.getOperator().getUserFunction();
+		if (source.getOperator().getUserFunction() instanceof InputFormatSource) {
+			InputFormatSource<T> fs = (InputFormatSource<T>) source.getOperator().getUserFunction();
 			streamGraph.setInputFormat(source.getId(), fs.getFormat());
 		}
 		streamGraph.setParallelism(source.getId(), source.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
index 86677a6..4517eea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 public interface OutputTypeConfigurable<OUT> {
 
 	/**
-	 * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)}
+	 * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, String, StreamOperator, TypeInformation, TypeInformation, String)}
 	 * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
 	 * method is called with the output {@link TypeInformation} which is also used for the
 	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 69e920f..9ed715e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -69,7 +69,7 @@ public interface StreamOperator<OUT> extends Serializable {
 
 	 * <p>
 	 * The method is expected to flush all remaining buffered data. Exceptions during this flushing
-	 * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
+	 * of buffered should be propagated, in order to cause the operation to be recognized as failed,
 	 * because the last data items are not processed properly.
 	 * 
 	 * @throws java.lang.Exception An exception in this method causes the operator to fail.

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c5f983a..6e2e9f9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -141,6 +141,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
 	}
 
+	public Object getCheckpointLock() {
+		return mockTask.getCheckpointLock();
+	}
+
 	public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
 		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7be7840..f6dab1e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
-
-import org.apache.flink.annotation.{Internal, PublicEvolving, Public}
+import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,14 +27,12 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.AbstractStateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source._
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
 import org.apache.flink.util.SplittableIterator
 
 import scala.collection.JavaConverters._
-
 import _root_.scala.language.implicitConversions
 
 @Public
@@ -454,20 +451,67 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
         DataStream[T] =
     asScalaStream(javaEnv.readFile(inputFormat, filePath))
 
-
   /**
-   * Creates a DataStream that contains the contents of file created while
-   * system watches the given path. The file will be read with the system's
-   * default character set. The user can check the monitoring interval in milliseconds,
-   * and the way file modifications are handled. By default it checks for only new files
-   * every 100 milliseconds.
-   *
-   */
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
-                     watchType: WatchType = WatchType.ONLY_NEW_FILES): DataStream[String] =
+    * Creates a DataStream that contains the contents of file created while
+    * system watches the given path. The file will be read with the system's
+    * default character set. The user can check the monitoring interval in milliseconds,
+    * and the way file modifications are handled. By default it checks for only new files
+    * every 100 milliseconds.
+    *
+    */
+  @Deprecated
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100,
+                     watchType: FileMonitoringFunction.WatchType =
+                     FileMonitoringFunction.WatchType.ONLY_NEW_FILES): DataStream[String] =
     asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType))
 
   /**
+    * Reads the contents of the user-specified path based on the given [[FileInputFormat]].
+    * Depending on the provided [[FileProcessingMode]], the source
+    * may periodically monitor (every `interval` ms) the path for new data
+    * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
+    * once the data currently in the path and exit
+    * ([[FileProcessingMode.PROCESS_ONCE]]). In addition,
+    * if the path contains files not to be processed, the user can specify a custom
+    * [[FilePathFilter]]. As a default implementation you can use
+    * [[FilePathFilter.createDefaultFilter()]].
+    *
+    * ** NOTES ON CHECKPOINTING: ** If the `watchType` is set to
+    * [[FileProcessingMode#PROCESS_ONCE]], the source monitors the path ** once **,
+    * creates the [[org.apache.flink.core.fs.FileInputSplit FileInputSplits]]
+    * to be processed, forwards them to the downstream
+    * [[ContinuousFileReaderOperator readers]] to read the actual data,
+    * and exits, without waiting for the readers to finish reading. This
+    * implies that no more checkpoint barriers are going to be forwarded
+    * after the source exits, thus having no checkpoints after that point.
+    *
+    * @param inputFormat
+    *          The input format used to create the data stream
+    * @param filePath
+    *          The path of the file, as a URI (e.g., "file:///some/local/file" or
+    *          "hdfs://host:port/file/path")
+    * @param watchType
+    *          The mode in which the source should operate, i.e. monitor path and react
+    *          to new data, or process once and exit
+    * @param interval
+    *          In the case of periodic path monitoring, this specifies the interval (in millis)
+    *          between consecutive path scans
+    * @param filter
+    *          The files to be excluded from the processing
+    * @return The data stream that represents the data read from the given file
+    */
+  @PublicEvolving
+  def readFile[T: TypeInformation](
+      inputFormat: FileInputFormat[T],
+      filePath: String,
+      watchType: FileProcessingMode,
+      interval: Long,
+      filter: FilePathFilter): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter, typeInfo))
+  }
+
+  /**
    * Creates a new DataStream that contains the strings received infinitely
    * from socket. Received strings are decoded by the system's default
    * character set. The maximum retry interval is specified in seconds, in case

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
new file mode 100644
index 0000000..4c0f648
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
+
+	private static final int NO_OF_FILES = 9;
+	private static final int LINES_PER_FILE = 200;
+	private static final int NO_OF_RETRIES = 3;
+	private static final int PARALLELISM = 4;
+	private static final long INTERVAL = 2000;
+
+	private static File baseDir;
+	private static org.apache.hadoop.fs.FileSystem fs;
+	private static String localFsURI;
+	private FileCreator fc;
+
+	private static  Map<Integer, List<String>> finalCollectedContent = new HashMap<>();
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+
+			localFsURI = "file:///" + baseDir +"/";
+			fs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+
+		// set the restart strategy.
+		env.getConfig().setRestartStrategy(
+			RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
+		env.enableCheckpointing(20);
+		env.setParallelism(PARALLELISM);
+
+		// create and start the file creating thread.
+		fc = new FileCreator();
+		fc.start();
+
+		// create the monitoring source along with the necessary readers.
+		TestingSinkFunction sink = new TestingSinkFunction();
+		TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
+		DataStream<String> inputStream = env.readFile(format, localFsURI,
+			FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, FilePathFilter.createDefaultFilter());
+
+		inputStream.flatMap(new FlatMapFunction<String, String>() {
+			@Override
+			public void flatMap(String value, Collector<String> out) throws Exception {
+				out.collect(value);
+			}
+		}).addSink(sink).setParallelism(1);
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		Map<Integer, List<String>> collected = finalCollectedContent;
+		Assert.assertEquals(collected.size(), fc.getFileContent().size());
+		for (Integer fileIdx: fc.getFileContent().keySet()) {
+			Assert.assertTrue(collected.keySet().contains(fileIdx));
+
+			List<String> cntnt = collected.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(fc.getFileContent().get(fileIdx), cntntStr.toString());
+		}
+
+		collected.clear();
+		finalCollectedContent.clear();
+		fc.clean();
+	}
+
+	private int getLineNo(String line) {
+		String[] tkns = line.split("\\s");
+		Assert.assertTrue(tkns.length == 6);
+		return Integer.parseInt(tkns[tkns.length - 1]);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	// -------------------------			FILE CREATION			-------------------------------
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator extends Thread {
+
+		private final Set<Path> filesCreated = new HashSet<>();
+		private final Map<Integer, String> fileContents = new HashMap<>();
+
+		public void run() {
+			try {
+				for(int i = 0; i < NO_OF_FILES; i++) {
+					Tuple2<org.apache.hadoop.fs.Path, String> file =
+						fillWithData(localFsURI, "file", i, "This is test line.");
+					filesCreated.add(file.f0);
+					fileContents.put(i, file.f1);
+
+					Thread.sleep((int) (INTERVAL / (3.0/2)));
+				}
+			} catch (IOException | InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+
+		void clean() throws IOException {
+			assert (fs != null);
+			for (org.apache.hadoop.fs.Path path: filesCreated) {
+				fs.delete(path, false);
+			}
+			fileContents.clear();
+		}
+
+		Map<Integer, String> getFileContent() {
+			return this.fileContents;
+		}
+	}
+
+	/**
+	 * Fill the file with content and put the content in the {@code hdPathContents} list.
+	 * */
+	private Tuple2<Path, String> fillWithData(
+		String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+
+		assert (fs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = fs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for(int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+		fs.rename(tmp, file);
+		Assert.assertTrue("No result file present", fs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+
+	// --------------------------			Task Sink			------------------------------
+
+	private static class TestingSinkFunction extends RichSinkFunction<String>
+		implements Checkpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener {
+
+		private static volatile boolean hasFailed = false;
+
+		private volatile int numSuccessfulCheckpoints;
+
+		private long count;
+
+		private long elementsToFailure;
+
+		private long elementCounter = 0;
+
+		private  Map<Integer, Set<String>> collectedContent = new HashMap<>();
+
+		TestingSinkFunction() {
+			hasFailed = false;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+			long failurePosMin = (long) (0.4 * LINES_PER_FILE);
+			long failurePosMax = (long) (0.7 * LINES_PER_FILE);
+
+			elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+
+			if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
+				finalCollectedContent = new HashMap<>();
+				for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
+					finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
+				}
+				throw new SuccessException();
+			}
+		}
+
+		@Override
+		public void close() {
+			try {
+				super.close();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		public void invoke(String value) throws Exception {
+			int fileIdx = Character.getNumericValue(value.charAt(0));
+
+			Set<String> content = collectedContent.get(fileIdx);
+			if (content == null) {
+				content = new HashSet<>();
+				collectedContent.put(fileIdx, content);
+			}
+
+			if (!content.add(value + "\n")) {
+				fail("Duplicate line: " + value);
+				System.exit(0);
+			}
+
+
+			elementCounter++;
+			if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
+				finalCollectedContent = new HashMap<>();
+				for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
+					finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
+				}
+				throw new SuccessException();
+			}
+
+			count++;
+			if (!hasFailed) {
+				Thread.sleep(2);
+				if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) {
+					hasFailed = true;
+					throw new Exception("Task Failure");
+				}
+			}
+		}
+
+		@Override
+		public Tuple2<Long, Map<Integer, Set<String>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return new Tuple2<>(elementCounter, collectedContent);
+		}
+
+		@Override
+		public void restoreState(Tuple2<Long, Map<Integer, Set<String>>> state) throws Exception {
+			this.elementCounter = state.f0;
+			this.collectedContent = state.f1;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			numSuccessfulCheckpoints++;
+		}
+	}
+}


Mime
View raw message