flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [3/3] flink git commit: [FLINK-4800] Introduce the TimestampedFileInputSplit for Continuous File Processing
Date Thu, 27 Oct 2016 12:22:25 GMT
[FLINK-4800] Introduce the TimestampedFileInputSplit for Continuous File Processing

This commit mainly introduces the TimestampedFileInputSplit,
which extends the class FileInputSplit and also contains:
i) the modification time of the file it belongs to and also, and
ii) when checkpointing, the point the reader is currently reading
    from in the split the reader.

This will be useful for rescaling. With this addition, the
ContinuousFileMonitoringFunction sends TimestampedFileInputSplit
to the Readers, and the Readers' state now contain only
TimestampedFileInputSplit.

In addition, it refactors the code of the ContinuousFileMonitoringFunction
and that of the ContinuousFileReaderOperator along with the related
tests.

This closes #2618.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b410c393
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b410c393
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b410c393

Branch: refs/heads/master
Commit: b410c393c960f55c09fadd4f22732d06f801b938
Parents: 2b600d3
Author: kl0u <kkloudas@gmail.com>
Authored: Fri Oct 7 00:21:42 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu Oct 27 14:22:02 2016 +0200

----------------------------------------------------------------------
 .../ContinuousFileMonitoringFunctionITCase.java | 287 --------
 .../hdfstests/ContinuousFileMonitoringTest.java | 614 ----------------
 .../ContinuousFileProcessingITCase.java         | 306 ++++++++
 .../hdfstests/ContinuousFileProcessingTest.java | 699 +++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java |  18 +-
 .../ContinuousFileMonitoringFunction.java       | 208 +++---
 .../source/ContinuousFileReaderOperator.java    | 191 ++---
 .../functions/source/FileProcessingMode.java    |  11 +-
 .../source/TimestampedFileInputSplit.java       | 137 ++++
 ...ontinuousFileProcessingCheckpointITCase.java | 270 +++----
 .../TimestampedFileInputSplitTest.java          |  93 +++
 11 files changed, 1549 insertions(+), 1285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
deleted file mode 100644
index 079bf04..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.hdfstests;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.api.common.io.FilePathFilter;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTestBase {
-
-	private static final int NO_OF_FILES = 10;
-	private static final int LINES_PER_FILE = 10;
-
-	private static final long INTERVAL = 100;
-
-	private File baseDir;
-
-	private org.apache.hadoop.fs.FileSystem hdfs;
-	private String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-
-	private static Map<Integer, String> expectedContents = new HashMap<>();
-
-	//						PREPARING FOR THE TESTS
-
-	@Before
-	public void createHDFS() {
-		try {
-			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-
-			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
-
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
-
-		} catch(Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			FileUtil.fullyDelete(baseDir);
-			hdfsCluster.shutdown();
-		} catch (Throwable t) {
-			throw new RuntimeException(t);
-		}
-	}
-
-	//						END OF PREPARATIONS
-
-	@Override
-	protected void testProgram() throws Exception {
-
-		/*
-		* This test checks the interplay between the monitor and the reader
-		* and also the failExternally() functionality. To test the latter we
-		* set the parallelism to 1 so that we have the chaining between the sink,
-		* which throws the SuccessException to signal the end of the test, and the
-		* reader.
-		* */
-
-		FileCreator fileCreator = new FileCreator(INTERVAL);
-		Thread t = new Thread(fileCreator);
-		t.start();
-
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		format.setFilePath(hdfsURI);
-
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(4);
-
-			format.setFilesFilter(FilePathFilter.createDefaultFilter());
-			ContinuousFileMonitoringFunction<String> monitoringFunction =
-				new ContinuousFileMonitoringFunction<>(format, hdfsURI,
-					FileProcessingMode.PROCESS_CONTINUOUSLY,
-					env.getParallelism(), INTERVAL);
-
-			TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
-			ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
-			TestingSinkFunction sink = new TestingSinkFunction();
-
-			DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
-			splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
-			env.execute();
-
-		} catch (Exception e) {
-			Throwable th = e;
-			int depth = 0;
-
-			for (; depth < 20; depth++) {
-				if (th instanceof SuccessException) {
-					try {
-						postSubmit();
-					} catch (Exception e1) {
-						e1.printStackTrace();
-					}
-					return;
-				} else if (th.getCause() != null) {
-					th = th.getCause();
-				} else {
-					break;
-				}
-			}
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	private static class TestingSinkFunction extends RichSinkFunction<String> {
-
-		private int elementCounter = 0;
-		private Map<Integer, Integer> elementCounters = new HashMap<>();
-		private Map<Integer, List<String>> collectedContent = new HashMap<>();
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void close() {
-			// check if the data that we collected are the ones they are supposed to be.
-
-			Assert.assertEquals(collectedContent.size(), expectedContents.size());
-			for (Integer fileIdx: expectedContents.keySet()) {
-				Assert.assertTrue(collectedContent.keySet().contains(fileIdx));
-
-				List<String> cntnt = collectedContent.get(fileIdx);
-				Collections.sort(cntnt, new Comparator<String>() {
-					@Override
-					public int compare(String o1, String o2) {
-						return getLineNo(o1) - getLineNo(o2);
-					}
-				});
-
-				StringBuilder cntntStr = new StringBuilder();
-				for (String line: cntnt) {
-					cntntStr.append(line);
-				}
-				Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
-			}
-			expectedContents.clear();
-		}
-
-		private int getLineNo(String line) {
-			String[] tkns = line.split("\\s");
-			Assert.assertTrue(tkns.length == 6);
-			return Integer.parseInt(tkns[tkns.length - 1]);
-		}
-
-		@Override
-		public void invoke(String value) throws Exception {
-			int fileIdx = Character.getNumericValue(value.charAt(0));
-
-			Integer counter = elementCounters.get(fileIdx);
-			if (counter == null) {
-				counter = 0;
-			} else if (counter == LINES_PER_FILE) {
-				// ignore duplicate lines.
-				Assert.fail("Duplicate lines detected.");
-			}
-			elementCounters.put(fileIdx, ++counter);
-
-			List<String> content = collectedContent.get(fileIdx);
-			if (content == null) {
-				content = new ArrayList<>();
-				collectedContent.put(fileIdx, content);
-			}
-			content.add(value + "\n");
-
-			elementCounter++;
-			if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
-				throw new SuccessException();
-			}
-		}
-	}
-
-	/**
-	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
-	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
-	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
-	 * */
-	private class FileCreator implements Runnable {
-
-		private final long interval;
-
-		FileCreator(long interval) {
-			this.interval = interval;
-		}
-
-		public void run() {
-			try {
-				for (int i = 0; i < NO_OF_FILES; i++) {
-					fillWithData(hdfsURI, "file", i, "This is test line.");
-					Thread.sleep(interval);
-				}
-			} catch (IOException e) {
-				e.printStackTrace();
-			} catch (InterruptedException e) {
-				// we just close without any message.
-			}
-		}
-	}
-
-	/**
-	 * Fill the file with content.
-	 * */
-	private void fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
-		assert (hdfs != null);
-
-		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-
-		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
-		FSDataOutputStream stream = hdfs.create(tmp);
-		StringBuilder str = new StringBuilder();
-		for (int i = 0; i < LINES_PER_FILE; i++) {
-			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-			str.append(line);
-			stream.write(line.getBytes());
-		}
-		stream.close();
-
-		hdfs.rename(tmp, file);
-
-		expectedContents.put(fileIdx, str.toString());
-
-		Assert.assertTrue("No result file present", hdfs.exists(file));
-	}
-
-	public static class SuccessException extends Exception {
-		private static final long serialVersionUID = -7011865671593955887L;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
deleted file mode 100644
index 198a621..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.common.io.FilePathFilter;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class ContinuousFileMonitoringTest {
-
-	private static final int NO_OF_FILES = 10;
-	private static final int LINES_PER_FILE = 10;
-
-	private static final long INTERVAL = 100;
-
-	private static File baseDir;
-
-	private static org.apache.hadoop.fs.FileSystem hdfs;
-	private static String hdfsURI;
-	private static MiniDFSCluster hdfsCluster;
-
-	//						PREPARING FOR THE TESTS
-
-	@BeforeClass
-	public static void createHDFS() {
-		try {
-			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-
-			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
-
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
-
-		} catch(Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void destroyHDFS() {
-		try {
-			FileUtil.fullyDelete(baseDir);
-			hdfsCluster.shutdown();
-		} catch (Throwable t) {
-			throw new RuntimeException(t);
-		}
-	}
-
-	//						END OF PREPARATIONS
-
-	//						TESTS
-
-	@Test
-	public void testFileReadingOperatorWithIngestionTime() throws Exception {
-		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
-		Map<Integer, String> expectedFileContents = new HashMap<>();
-
-		for(int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
-			filesCreated.add(file.f0);
-			expectedFileContents.put(i, file.f1);
-		}
-
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
-
-		final long watermarkInterval = 10;
-
-		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
-		reader.setOutputType(typeInfo, new ExecutionConfig());
-
-		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
-			new OneInputStreamOperatorTestHarness<>(reader);
-
-		tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
-
-
-		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		tester.open();
-
-		Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
-
-		// test that watermarks are correctly emitted
-
-		tester.setProcessingTime(201);
-		tester.setProcessingTime(301);
-		tester.setProcessingTime(401);
-		tester.setProcessingTime(501);
-
-		int i = 0;
-		for(Object line: tester.getOutput()) {
-			if (!(line instanceof Watermark)) {
-				Assert.fail("Only watermarks are expected here ");
-			}
-			Watermark w = (Watermark) line;
-			Assert.assertEquals(200 + (i * 100), w.getTimestamp());
-			i++;
-		}
-
-		// clear the output to get the elements only and the final watermark
-		tester.getOutput().clear();
-		Assert.assertEquals(0, tester.getOutput().size());
-
-		// create the necessary splits for the test
-		FileInputSplit[] splits = format.createInputSplits(
-			reader.getRuntimeContext().getNumberOfParallelSubtasks());
-
-		// and feed them to the operator
-		Map<Integer, List<String>> actualFileContents = new HashMap<>();
-
-		long lastSeenWatermark = Long.MIN_VALUE;
-		int lineCounter = 0;	// counter for the lines read from the splits
-		int watermarkCounter = 0;
-
-		for(FileInputSplit split: splits) {
-
-			// set the next "current processing time".
-			long nextTimestamp = tester.getProcessingTime() + watermarkInterval;
-			tester.setProcessingTime(nextTimestamp);
-
-			// send the next split to be read and wait until it is fully read.
-			tester.processElement(new StreamRecord<>(split));
-			synchronized (tester.getCheckpointLock()) {
-				while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
-					tester.getCheckpointLock().wait(10);
-				}
-			}
-
-			// verify that the results are the expected
-			for(Object line: tester.getOutput()) {
-				if (line instanceof StreamRecord) {
-					StreamRecord<String> element = (StreamRecord<String>) line;
-					lineCounter++;
-
-					Assert.assertEquals(nextTimestamp, element.getTimestamp());
-
-					int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
-					List<String> content = actualFileContents.get(fileIdx);
-					if (content == null) {
-						content = new ArrayList<>();
-						actualFileContents.put(fileIdx, content);
-					}
-					content.add(element.getValue() + "\n");
-				} else if (line instanceof Watermark) {
-					long watermark = ((Watermark) line).getTimestamp();
-
-					Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
-					Assert.assertTrue(watermark > lastSeenWatermark);
-					watermarkCounter++;
-
-					lastSeenWatermark = watermark;
-				} else {
-					Assert.fail("Unknown element in the list.");
-				}
-			}
-
-			// clean the output to be ready for the next split
-			tester.getOutput().clear();
-		}
-
-		// now we are processing one split after the other,
-		// so all the elements must be here by now.
-		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
-
-		// because we expect one watermark per split.
-		Assert.assertEquals(NO_OF_FILES, watermarkCounter);
-
-		// then close the reader gracefully so that the Long.MAX watermark is emitted
-		synchronized (tester.getCheckpointLock()) {
-			tester.close();
-		}
-
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
-			hdfs.delete(file, false);
-		}
-
-		// check if the last element is the LongMax watermark (by now this must be the only element)
-		Assert.assertEquals(1, tester.getOutput().size());
-		Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
-		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
-
-		// check if the elements are the expected ones.
-		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
-		for (Integer fileIdx: expectedFileContents.keySet()) {
-			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
-
-			List<String> cntnt = actualFileContents.get(fileIdx);
-			Collections.sort(cntnt, new Comparator<String>() {
-				@Override
-				public int compare(String o1, String o2) {
-					return getLineNo(o1) - getLineNo(o2);
-				}
-			});
-
-			StringBuilder cntntStr = new StringBuilder();
-			for (String line: cntnt) {
-				cntntStr.append(line);
-			}
-			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
-		}
-	}
-
-	@Test
-	public void testFileReadingOperator() throws Exception {
-		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
-		Map<Integer, String> expectedFileContents = new HashMap<>();
-		for(int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
-			filesCreated.add(file.f0);
-			expectedFileContents.put(i, file.f1);
-		}
-
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
-
-		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
-		reader.setOutputType(typeInfo, new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
-			new OneInputStreamOperatorTestHarness<>(reader);
-		tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
-		tester.open();
-
-		// create the necessary splits for the test
-		FileInputSplit[] splits = format.createInputSplits(
-			reader.getRuntimeContext().getNumberOfParallelSubtasks());
-
-		// and feed them to the operator
-		for(FileInputSplit split: splits) {
-			tester.processElement(new StreamRecord<>(split));
-		}
-
-		// then close the reader gracefully (and wait to finish reading)
-		synchronized (tester.getCheckpointLock()) {
-			tester.close();
-		}
-
-		// the lines received must be the elements in the files +1 for for the longMax watermark
-		// we are in event time, which emits no watermarks, so the last watermark will mark the
-		// of the input stream.
-
-		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size());
-
-		Map<Integer, List<String>> actualFileContents = new HashMap<>();
-		Object lastElement = null;
-		for(Object line: tester.getOutput()) {
-			lastElement = line;
-			if (line instanceof StreamRecord) {
-				StreamRecord<String> element = (StreamRecord<String>) line;
-
-				int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
-				List<String> content = actualFileContents.get(fileIdx);
-				if (content == null) {
-					content = new ArrayList<>();
-					actualFileContents.put(fileIdx, content);
-				}
-				content.add(element.getValue() + "\n");
-			}
-		}
-
-		// check if the last element is the LongMax watermark
-		Assert.assertTrue(lastElement instanceof Watermark);
-		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
-
-		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
-		for (Integer fileIdx: expectedFileContents.keySet()) {
-			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
-
-			List<String> cntnt = actualFileContents.get(fileIdx);
-			Collections.sort(cntnt, new Comparator<String>() {
-				@Override
-				public int compare(String o1, String o2) {
-					return getLineNo(o1) - getLineNo(o2);
-				}
-			});
-
-			StringBuilder cntntStr = new StringBuilder();
-			for (String line: cntnt) {
-				cntntStr.append(line);
-			}
-			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
-		}
-
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
-			hdfs.delete(file, false);
-		}
-	}
-
-	private static class PathFilter extends FilePathFilter {
-
-		@Override
-		public boolean filterPath(Path filePath) {
-			return filePath.getName().startsWith("**");
-		}
-	}
-
-	@Test
-	public void testFilePathFiltering() throws Exception {
-		Set<String> uniqFilesFound = new HashSet<>();
-		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
-
-		// create the files to be discarded
-		for (int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "**file", i, "This is test line.");
-			filesCreated.add(file.f0);
-		}
-
-		// create the files to be kept
-		for (int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
-			filesCreated.add(file.f0);
-		}
-
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		format.setFilesFilter(new PathFilter());
-		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
-				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
-
-		monitoringFunction.open(new Configuration());
-		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
-
-		Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-		for(int i = 0; i < NO_OF_FILES; i++) {
-			org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-			Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-		}
-
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
-			hdfs.delete(file, false);
-		}
-	}
-
-	@Test
-	public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
-		final Set<String> uniqFilesFound = new HashSet<>();
-
-		FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
-		fc.start();
-
-		Thread t = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-				format.setFilesFilter(FilePathFilter.createDefaultFilter());
-				ContinuousFileMonitoringFunction<String> monitoringFunction =
-					new ContinuousFileMonitoringFunction<>(format, hdfsURI,
-						FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
-				try {
-					monitoringFunction.open(new Configuration());
-					monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
-				} catch (Exception e) {
-					// do nothing as we interrupted the thread.
-				}
-			}
-		});
-		t.start();
-
-		// wait until the sink also sees all the splits.
-		synchronized (uniqFilesFound) {
-			uniqFilesFound.wait();
-		}
-		t.interrupt();
-		fc.join();
-
-		Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
-		Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-
-		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
-		Set<String> fileNamesCreated = new HashSet<>();
-		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
-			fileNamesCreated.add(path.toString());
-		}
-
-		for(String file: uniqFilesFound) {
-			Assert.assertTrue(fileNamesCreated.contains(file));
-		}
-
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
-			hdfs.delete(file, false);
-		}
-	}
-
-	@Test
-	public void testFileSplitMonitoringProcessOnce() throws Exception {
-		Set<String> uniqFilesFound = new HashSet<>();
-
-		FileCreator fc = new FileCreator(INTERVAL, 1);
-		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
-		fc.start();
-
-		// to make sure that at least one file is created
-		if (filesCreated.size() == 0) {
-			synchronized (filesCreated) {
-				if (filesCreated.size() == 0) {
-					filesCreated.wait();
-				}
-			}
-		}
-		Assert.assertTrue(fc.getFilesCreated().size() >= 1);
-
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		format.setFilesFilter(FilePathFilter.createDefaultFilter());
-		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
-				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
-
-		monitoringFunction.open(new Configuration());
-		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
-
-		// wait until all the files are created
-		fc.join();
-
-		Assert.assertEquals(NO_OF_FILES, filesCreated.size());
-
-		Set<String> fileNamesCreated = new HashSet<>();
-		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
-			fileNamesCreated.add(path.toString());
-		}
-
-		Assert.assertTrue(uniqFilesFound.size() >= 1 && uniqFilesFound.size() < fileNamesCreated.size());
-		for(String file: uniqFilesFound) {
-			Assert.assertTrue(fileNamesCreated.contains(file));
-		}
-
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
-			hdfs.delete(file, false);
-		}
-	}
-
-	// -------------		End of Tests
-
-	private int getLineNo(String line) {
-		String[] tkns = line.split("\\s");
-		Assert.assertEquals(6, tkns.length);
-		return Integer.parseInt(tkns[tkns.length - 1]);
-	}
-
-	/**
-	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
-	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
-	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
-	 * */
-	private class FileCreator extends Thread {
-
-		private final long interval;
-		private final int noOfFilesBeforeNotifying;
-
-		private final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
-
-		FileCreator(long interval, int notificationLim) {
-			this.interval = interval;
-			this.noOfFilesBeforeNotifying = notificationLim;
-		}
-
-		public void run() {
-			try {
-				for(int i = 0; i < NO_OF_FILES; i++) {
-					Tuple2<org.apache.hadoop.fs.Path, String> file =
-						fillWithData(hdfsURI, "file", i, "This is test line.");
-
-					synchronized (filesCreated) {
-						filesCreated.add(file.f0);
-						if (filesCreated.size() == noOfFilesBeforeNotifying) {
-							filesCreated.notifyAll();
-						}
-					}
-					Thread.sleep(interval);
-				}
-			} catch (IOException | InterruptedException e) {
-				e.printStackTrace();
-			}
-		}
-
-		Set<org.apache.hadoop.fs.Path> getFilesCreated() {
-			return this.filesCreated;
-		}
-	}
-
-	private class TestingSourceContext implements SourceFunction.SourceContext<FileInputSplit> {
-
-		private final ContinuousFileMonitoringFunction src;
-		private final Set<String> filesFound;
-		private final Object lock = new Object();
-
-		TestingSourceContext(ContinuousFileMonitoringFunction monitoringFunction, Set<String> uniqFilesFound) {
-			this.filesFound = uniqFilesFound;
-			this.src = monitoringFunction;
-		}
-
-		@Override
-		public void collect(FileInputSplit element) {
-
-			String filePath = element.getPath().toString();
-			if (filesFound.contains(filePath)) {
-				// check if we have duplicate splits that are open during the first time
-				// the monitor sees them, and they then close, so the modification time changes.
-				Assert.fail("Duplicate file: " + filePath);
-			}
-
-			synchronized (filesFound) {
-				filesFound.add(filePath);
-				try {
-					if (filesFound.size() == NO_OF_FILES) {
-						this.src.cancel();
-						this.src.close();
-						filesFound.notifyAll();
-					}
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-			}
-		}
-
-		@Override
-		public void collectWithTimestamp(FileInputSplit element, long timestamp) {
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lock;
-		}
-
-		@Override
-		public void close() {
-		}
-	}
-
-	/**
-	 * Fill the file with content.
-	 * */
-	private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
-		assert (hdfs != null);
-
-		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-		Assert.assertFalse(hdfs.exists(file));
-
-		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
-		FSDataOutputStream stream = hdfs.create(tmp);
-		StringBuilder str = new StringBuilder();
-		for(int i = 0; i < LINES_PER_FILE; i++) {
-			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-			str.append(line);
-			stream.write(line.getBytes());
-		}
-		stream.close();
-
-		hdfs.rename(tmp, file);
-
-		Assert.assertTrue("No result file present", hdfs.exists(file));
-		return new Tuple2<>(file, str.toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
new file mode 100644
index 0000000..3211a20
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
+
+	private static final int NO_OF_FILES = 5;
+	private static final int LINES_PER_FILE = 100;
+
+	private static final int PARALLELISM = 4;
+	private static final long INTERVAL = 100;
+
+	private File baseDir;
+
+	private org.apache.hadoop.fs.FileSystem hdfs;
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+
+	private static Map<Integer, String> expectedContents = new HashMap<>();
+
+	//						PREPARING FOR THE TESTS
+
+	@Before
+	public void createHDFS() {
+		try {
+			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	//						END OF PREPARATIONS
+
+	@Override
+	protected void testProgram() throws Exception {
+
+		/*
+		* This test checks the interplay between the monitor and the reader
+		* and also the failExternally() functionality. To test the latter we
+		* set the parallelism to 1 so that we have the chaining between the sink,
+		* which throws the SuccessException to signal the end of the test, and the
+		* reader.
+		* */
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilePath(hdfsURI);
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
+		// create the stream execution environment with a parallelism > 1 to test
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+				FileProcessingMode.PROCESS_CONTINUOUSLY,
+				env.getParallelism(), INTERVAL);
+
+		// the monitor has always DOP 1
+		DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);
+		Assert.assertEquals(1, splits.getParallelism());
+
+		ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		// the readers can be multiple
+		DataStream<String> content = splits.transform("FileSplitReader", typeInfo, reader);
+		Assert.assertEquals(PARALLELISM, content.getParallelism());
+
+		// finally for the sink we set the parallelism to 1 so that we can verify the output
+		TestingSinkFunction sink = new TestingSinkFunction();
+		content.addSink(sink).setParallelism(1);
+
+		Thread job = new Thread() {
+
+			@Override
+			public void run() {
+				try {
+					env.execute("ContinuousFileProcessingITCase Job.");
+				} catch (Exception e) {
+					Throwable th = e;
+					for (int depth = 0; depth < 20; depth++) {
+						if (th instanceof SuccessException) {
+							try {
+								postSubmit();
+							} catch (Exception e1) {
+								e1.printStackTrace();
+							}
+							return;
+						} else if (th.getCause() != null) {
+							th = th.getCause();
+						} else {
+							break;
+						}
+					}
+					e.printStackTrace();
+					Assert.fail(e.getMessage());
+				}
+			}
+		};
+		job.start();
+
+		// The modification time of the last created file.
+		long lastCreatedModTime = Long.MIN_VALUE;
+
+		// create the files to be read
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
+			long modTime;
+			do {
+
+				// give it some time so that the files have
+				// different modification timestamps.
+				Thread.sleep(50);
+
+				tmpFile = fillWithData(hdfsURI, "file", i, "This is test line.");
+
+				modTime = hdfs.getFileStatus(tmpFile.f0).getModificationTime();
+				if (modTime <= lastCreatedModTime) {
+					// delete the last created file to recreate it with a different timestamp
+					hdfs.delete(tmpFile.f0, false);
+				}
+			} while (modTime <= lastCreatedModTime);
+			lastCreatedModTime = modTime;
+
+			// put the contents in the expected results list before the reader picks them
+			// this is to guarantee that they are in before the reader finishes (avoid race conditions)
+			expectedContents.put(i, tmpFile.f1);
+
+			org.apache.hadoop.fs.Path file =
+				new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
+			hdfs.rename(tmpFile.f0, file);
+			Assert.assertTrue(hdfs.exists(file));
+		}
+
+		// wait for the job to finish.
+		job.join();
+	}
+
+	private static class TestingSinkFunction extends RichSinkFunction<String> {
+
+		private int elementCounter = 0;
+		private Map<Integer, Set<String>> actualContent = new HashMap<>();
+
+		private transient Comparator<String> comparator;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+			comparator = new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			};
+		}
+
+		@Override
+		public void invoke(String value) throws Exception {
+			int fileIdx = getFileIdx(value);
+
+			Set<String> content = actualContent.get(fileIdx);
+			if (content == null) {
+				content = new HashSet<>();
+				actualContent.put(fileIdx, content);
+			}
+
+			if (!content.add(value + "\n")) {
+				Assert.fail("Duplicate line: "+ value);
+				System.exit(0);
+			}
+
+			elementCounter++;
+			if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
+				throw new SuccessException();
+			}
+		}
+
+		@Override
+		public void close() {
+			// check if the data that we collected are the ones they are supposed to be.
+			Assert.assertEquals(expectedContents.size(), actualContent.size());
+			for (Integer fileIdx: expectedContents.keySet()) {
+				Assert.assertTrue(actualContent.keySet().contains(fileIdx));
+
+				List<String> cntnt = new ArrayList<>(actualContent.get(fileIdx));
+				Collections.sort(cntnt, comparator);
+
+				StringBuilder cntntStr = new StringBuilder();
+				for (String line: cntnt) {
+					cntntStr.append(line);
+				}
+				Assert.assertEquals(expectedContents.get(fileIdx), cntntStr.toString());
+			}
+			expectedContents.clear();
+		}
+
+		private int getLineNo(String line) {
+			String[] tkns = line.split("\\s");
+			return Integer.parseInt(tkns[tkns.length - 1]);
+		}
+
+		private int getFileIdx(String line) {
+			String[] tkns = line.split(":");
+			return Integer.parseInt(tkns[0]);
+		}
+	}
+
+	/** Create a file and fill it with content. */
+	private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(
+		String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {
+
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path tmp =
+			new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx + ": " + sampleLine + " " + i + "\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+		return new Tuple2<>(tmp, str.toString());
+	}
+
+	public static class SuccessException extends Exception {
+		private static final long serialVersionUID = -7011865671593955887L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
new file mode 100644
index 0000000..0283f68
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ContinuousFileProcessingTest {
+
+	private static final int NO_OF_FILES = 5;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private static File baseDir;
+
+	private static org.apache.hadoop.fs.FileSystem hdfs;
+	private static String hdfsURI;
+	private static MiniDFSCluster hdfsCluster;
+
+	//						PREPARING FOR THE TESTS
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	//						END OF PREPARATIONS
+
+	//						TESTS
+
+	@Test
+	public void testFileReadingOperatorWithIngestionTime() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Map<Integer, String> expectedFileContents = new HashMap<>();
+		Map<String, Long> modTimes = new HashMap<>();
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+			modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
+			expectedFileContents.put(i, file.f1);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		final long watermarkInterval = 10;
+
+		ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
+		final OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester =
+			new OneInputStreamOperatorTestHarness<>(reader);
+
+		tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
+		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		reader.setOutputType(typeInfo, tester.getExecutionConfig());
+
+		tester.open();
+
+		Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
+
+		// test that watermarks are correctly emitted
+
+		ConcurrentLinkedQueue<Object> output = tester.getOutput();
+
+		tester.setProcessingTime(201);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(200, ((Watermark) output.poll()).getTimestamp());
+
+		tester.setProcessingTime(301);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(300, ((Watermark) output.poll()).getTimestamp());
+
+		tester.setProcessingTime(401);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(400, ((Watermark) output.poll()).getTimestamp());
+
+		tester.setProcessingTime(501);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(500, ((Watermark) output.poll()).getTimestamp());
+
+		Assert.assertTrue(output.isEmpty());
+
+		// create the necessary splits for the test
+		FileInputSplit[] splits = format.createInputSplits(
+			reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+		// and feed them to the operator
+		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+		long lastSeenWatermark = Long.MIN_VALUE;
+		int lineCounter = 0;	// counter for the lines read from the splits
+		int watermarkCounter = 0;
+
+		for (FileInputSplit split: splits) {
+
+			// set the next "current processing time".
+			long nextTimestamp = tester.getProcessingTime() + watermarkInterval;
+			tester.setProcessingTime(nextTimestamp);
+
+			// send the next split to be read and wait until it is fully read, the +1 is for the watermark.
+			tester.processElement(new StreamRecord<>(
+				new TimestampedFileInputSplit(modTimes.get(split.getPath().getName()),
+					split.getSplitNumber(), split.getPath(), split.getStart(),
+					split.getLength(), split.getHostnames())));
+
+			// NOTE: the following check works because each file fits in one split.
+			// In other case it would fail and wait forever.
+			// BUT THIS IS JUST FOR THIS TEST
+			while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+				Thread.sleep(10);
+			}
+
+			// verify that the results are the expected
+			for (Object line: tester.getOutput()) {
+
+				if (line instanceof StreamRecord) {
+
+					@SuppressWarnings("unchecked")
+					StreamRecord<String> element = (StreamRecord<String>) line;
+					lineCounter++;
+
+					Assert.assertEquals(nextTimestamp, element.getTimestamp());
+
+					int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+					List<String> content = actualFileContents.get(fileIdx);
+					if (content == null) {
+						content = new ArrayList<>();
+						actualFileContents.put(fileIdx, content);
+					}
+					content.add(element.getValue() + "\n");
+				} else if (line instanceof Watermark) {
+					long watermark = ((Watermark) line).getTimestamp();
+
+					Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
+					Assert.assertTrue(watermark > lastSeenWatermark);
+					watermarkCounter++;
+
+					lastSeenWatermark = watermark;
+				} else {
+					Assert.fail("Unknown element in the list.");
+				}
+			}
+
+			// clean the output to be ready for the next split
+			tester.getOutput().clear();
+		}
+
+		// now we are processing one split after the other,
+		// so all the elements must be here by now.
+		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+		// because we expect one watermark per split.
+		Assert.assertEquals(splits.length, watermarkCounter);
+
+		// then close the reader gracefully so that the Long.MAX watermark is emitted
+		synchronized (tester.getCheckpointLock()) {
+			tester.close();
+		}
+
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+
+		// check if the last element is the LongMax watermark (by now this must be the only element)
+		Assert.assertEquals(1, tester.getOutput().size());
+		Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
+		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
+
+		// check if the elements are the expected ones.
+		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
+		for (Integer fileIdx: expectedFileContents.keySet()) {
+			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
+
+			List<String> cntnt = actualFileContents.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
+		}
+	}
+
+	@Test
+	public void testFileReadingOperatorWithEventTime() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Map<String, Long> modTimes = new HashMap<>();
+		Map<Integer, String> expectedFileContents = new HashMap<>();
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
+			filesCreated.add(file.f0);
+			expectedFileContents.put(i, file.f1);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
+		reader.setOutputType(typeInfo, new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester =
+			new OneInputStreamOperatorTestHarness<>(reader);
+		tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		tester.open();
+
+		// create the necessary splits for the test
+		FileInputSplit[] splits = format.createInputSplits(
+			reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+		// and feed them to the operator
+		for (FileInputSplit split: splits) {
+			tester.processElement(new StreamRecord<>(new TimestampedFileInputSplit(
+				modTimes.get(split.getPath().getName()), split.getSplitNumber(), split.getPath(),
+				split.getStart(), split.getLength(), split.getHostnames())));
+		}
+
+		// then close the reader gracefully (and wait to finish reading)
+		synchronized (tester.getCheckpointLock()) {
+			tester.close();
+		}
+
+		// the lines received must be the elements in the files +1 for for the longMax watermark
+		// we are in event time, which emits no watermarks, so the last watermark will mark the
+		// of the input stream.
+
+		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size());
+
+		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+		Object lastElement = null;
+		for (Object line: tester.getOutput()) {
+			lastElement = line;
+
+			if (line instanceof StreamRecord) {
+
+				@SuppressWarnings("unchecked")
+				StreamRecord<String> element = (StreamRecord<String>) line;
+
+				int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+				List<String> content = actualFileContents.get(fileIdx);
+				if (content == null) {
+					content = new ArrayList<>();
+					actualFileContents.put(fileIdx, content);
+				}
+				content.add(element.getValue() + "\n");
+			}
+		}
+
+		// check if the last element is the LongMax watermark
+		Assert.assertTrue(lastElement instanceof Watermark);
+		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
+
+		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
+		for (Integer fileIdx: expectedFileContents.keySet()) {
+			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
+
+			List<String> cntnt = actualFileContents.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
+		}
+
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	////				Monitoring Function Tests				//////
+
+	@Test
+	public void testFilePathFiltering() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Set<String> filesKept = new TreeSet<>();
+
+		// create the files to be discarded
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
+			filesCreated.add(file.f0);
+		}
+
+		// create the files to be kept
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file =
+				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+			filesKept.add(file.f0.getName());
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(new FilePathFilter() {
+			@Override
+			public boolean filterPath(Path filePath) {
+				return filePath.getName().startsWith("**");
+			}
+		});
+
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		final FileVerifyingSourceContext context =
+			new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(context);
+
+		Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray());
+
+		// finally delete the files created for the test.
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	@Test
+	public void testSortingOnModTime() throws Exception {
+		final long[] modTimes = new long[NO_OF_FILES];
+		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
+
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file =
+				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			Thread.sleep(400);
+
+			filesCreated[i] = file.f0;
+			modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime();
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
+		// this is just to verify that all splits have been forwarded later.
+		FileInputSplit[] splits = format.createInputSplits(1);
+
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(context);
+		Assert.assertEquals(splits.length, context.getCounter());
+
+		// delete the created files.
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			hdfs.delete(filesCreated[i], false);
+		}
+	}
+
+	@Test
+	public void testProcessOnce() throws Exception {
+		final OneShotLatch latch = new OneShotLatch();
+
+		// create a single file in the directory
+		Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
+			createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
+		Assert.assertTrue(hdfs.exists(bootstrap.f0));
+
+		// the source is supposed to read only this file.
+		final Set<String> filesToBeRead = new TreeSet<>();
+		filesToBeRead.add(bootstrap.f0.getName());
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		final FileVerifyingSourceContext context =
+			new FileVerifyingSourceContext(latch, monitoringFunction, 1, -1);
+
+		final Thread t = new Thread() {
+			@Override
+			public void run() {
+				try {
+					monitoringFunction.open(new Configuration());
+					monitoringFunction.run(context);
+				} catch (Exception e) {
+					Assert.fail(e.getMessage());
+				}
+			}
+		};
+		t.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		// create some additional files that should be processed in the case of PROCESS_CONTINUOUSLY
+		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> ignoredFile =
+				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated[i] = ignoredFile.f0;
+		}
+
+		// wait until the monitoring thread exits
+		t.join();
+
+		Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());
+
+		// finally delete the files created for the test.
+		hdfs.delete(bootstrap.f0, false);
+		for (org.apache.hadoop.fs.Path path: filesCreated) {
+			hdfs.delete(path, false);
+		}
+	}
+
+	@Test
+	public void testProcessContinuously() throws Exception {
+		final OneShotLatch latch = new OneShotLatch();
+
+		// create a single file in the directory
+		Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
+			createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
+		Assert.assertTrue(hdfs.exists(bootstrap.f0));
+
+		final Set<String> filesToBeRead = new TreeSet<>();
+		filesToBeRead.add(bootstrap.f0.getName());
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
+		final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch,
+			monitoringFunction, 1, totalNoOfFilesToBeRead);
+
+		final Thread t = new Thread() {
+
+			@Override
+			public void run() {
+				try {
+					monitoringFunction.open(new Configuration());
+					monitoringFunction.run(context);
+				} catch (Exception e) {
+					Assert.fail(e.getMessage());
+				}
+			}
+		};
+		t.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		// create some additional files that will be processed in the case of PROCESS_CONTINUOUSLY
+		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file =
+				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated[i] = file.f0;
+			filesToBeRead.add(file.f0.getName());
+		}
+
+		// wait until the monitoring thread exits
+		t.join();
+
+		Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());
+
+		// finally delete the files created for the test.
+		hdfs.delete(bootstrap.f0, false);
+		for (org.apache.hadoop.fs.Path path: filesCreated) {
+			hdfs.delete(path, false);
+		}
+	}
+
+	///////////				Source Contexts Used by the tests				/////////////////
+
+	private static class FileVerifyingSourceContext extends DummySourceContext {
+
+		private final ContinuousFileMonitoringFunction src;
+		private final OneShotLatch latch;
+		private final Set<String> seenFiles;
+		private final int elementsBeforeNotifying;
+
+		private int elementsBeforeCanceling = -1;
+
+		FileVerifyingSourceContext(OneShotLatch latch,
+								ContinuousFileMonitoringFunction src,
+								int elementsBeforeNotifying,
+								int elementsBeforeCanceling) {
+			this.latch = latch;
+			this.seenFiles = new TreeSet<>();
+			this.src = src;
+			this.elementsBeforeNotifying = elementsBeforeNotifying;
+			this.elementsBeforeCanceling = elementsBeforeCanceling;
+		}
+
+		Set<String> getSeenFiles() {
+			return this.seenFiles;
+		}
+
+		@Override
+		public void collect(TimestampedFileInputSplit element) {
+			String seenFileName = element.getPath().getName();
+
+			this.seenFiles.add(seenFileName);
+			if (seenFiles.size() == elementsBeforeNotifying) {
+				latch.trigger();
+			}
+
+			if (elementsBeforeCanceling != -1 && seenFiles.size() == elementsBeforeCanceling) {
+				src.cancel();
+			}
+		}
+	}
+
+	private static class ModTimeVerifyingSourceContext extends DummySourceContext {
+
+		final long[] expectedModificationTimes;
+		int splitCounter;
+		long lastSeenModTime;
+
+		ModTimeVerifyingSourceContext(long[] modTimes) {
+			this.expectedModificationTimes = modTimes;
+			this.splitCounter = 0;
+			this.lastSeenModTime = Long.MIN_VALUE;
+		}
+
+		int getCounter() {
+			return splitCounter;
+		}
+
+		@Override
+		public void collect(TimestampedFileInputSplit element) {
+			try {
+				long modTime = hdfs.getFileStatus(new org.apache.hadoop.fs.Path(element.getPath().getPath())).getModificationTime();
+
+				Assert.assertTrue(modTime >= lastSeenModTime);
+				Assert.assertEquals(expectedModificationTimes[splitCounter], modTime);
+
+				lastSeenModTime = modTime;
+				splitCounter++;
+			} catch (IOException e) {
+				Assert.fail(e.getMessage());
+			}
+		}
+	}
+
+	private static abstract class DummySourceContext
+			implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
+
+		private final Object lock = new Object();
+
+		@Override
+		public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/////////				Auxiliary Methods				/////////////
+
+	private int getLineNo(String line) {
+		String[] tkns = line.split("\\s");
+		Assert.assertEquals(6, tkns.length);
+		return Integer.parseInt(tkns[tkns.length - 1]);
+	}
+
+	/**
+	 * Create a file with pre-determined String format of the form:
+	 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
+	 * */
+	private Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
+				String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+		Assert.assertFalse(hdfs.exists(file));
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e1cfc60..08e17a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -82,10 +82,6 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * An ExecutionEnvironment for streaming jobs. An instance of it is
- * necessary to construct streaming topologies.
- */
-/**
  * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
  * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
  * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
@@ -1350,18 +1346,20 @@ public abstract class StreamExecutionEnvironment {
 		Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
 
 		Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
-						interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
-				"The path monitoring interval cannot be less than " +
-						ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
+				interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+			"The path monitoring interval cannot be less than " +
+					ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
 
-		ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
+		ContinuousFileMonitoringFunction<OUT> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(
 				inputFormat, inputFormat.getFilePath().toString(),
 				monitoringMode, getParallelism(), interval);
 
-		ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat);
+		ContinuousFileReaderOperator<OUT> reader =
+			new ContinuousFileReaderOperator<>(inputFormat);
 
 		SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
-				.transform("FileSplitReader_" + sourceName, typeInfo, reader);
+				.transform("Split Reader: " + sourceName, typeInfo, reader);
 
 		return new DataStreamSource<>(source);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index f9ef565..a6c5e49 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -19,13 +19,11 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -35,52 +33,59 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 /**
- * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for
- * i) monitoring a user-provided path, ii) deciding which files should be further read and processed,
- * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning
- * them to downstream tasks for further reading and processing. Which splits will be further processed
- * depends on the user-provided {@link FileProcessingMode} and the {@link FilePathFilter}.
- * The splits of the files to be read are then forwarded to the downstream
- * {@link ContinuousFileReaderOperator} which can have parallelism greater than one.
+ * This is the single (non-parallel) monitoring task which takes a {@link FileInputFormat}
+ * and, depending on the {@link FileProcessingMode} and the {@link FilePathFilter}, it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided path.</li>
+ *     <li>Deciding which files should be further read and processed.</li>
+ *     <li>Creating the {@link FileInputSplit splits} corresponding to those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * The splits to be read are forwarded to the downstream {@link ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in ascending modification time order,
+ * based on the modification time of the files they belong to.
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Long> {
+	extends RichSourceFunction<TimestampedFileInputSplit> implements Checkpointed<Long> {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
 
 	/**
-	 * The minimum interval allowed between consecutive path scans. This is applicable if the
-	 * {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+	 * The minimum interval allowed between consecutive path scans.
+	 * <p><b>NOTE:</b> Only applicable to the {@code PROCESS_CONTINUOUSLY} mode.
 	 */
-	public static final long MIN_MONITORING_INTERVAL = 100l;
+	public static final long MIN_MONITORING_INTERVAL = 1L;
 
 	/** The path to monitor. */
 	private final String path;
 
-	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
+	/** The parallelism of the downstream readers. */
 	private final int readerParallelism;
 
 	/** The {@link FileInputFormat} to be read. */
 	private FileInputFormat<OUT> format;
 
-	/** How often to monitor the state of the directory for new data. */
+	/** The interval between consecutive path scans. */
 	private final long interval;
 
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	private Long globalModificationTime;
+	/** The maximum file modification time seen so far. */
+	private volatile long globalModificationTime;
 
 	private transient Object checkpointLock;
 
@@ -91,10 +96,12 @@ public class ContinuousFileMonitoringFunction<OUT>
 		FileProcessingMode watchType,
 		int readerParallelism, long interval) {
 
-		if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) {
-			throw new IllegalArgumentException("The specified monitoring interval (" + interval + " ms) is " +
-				"smaller than the minimum allowed one (100 ms).");
-		}
+		Preconditions.checkArgument(
+			watchType == FileProcessingMode.PROCESS_ONCE || interval >= MIN_MONITORING_INTERVAL,
+			"The specified monitoring interval (" + interval + " ms) is smaller than the minimum " +
+				"allowed one (" + MIN_MONITORING_INTERVAL + " ms)."
+		);
+
 		this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
 		this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
 
@@ -105,16 +112,17 @@ public class ContinuousFileMonitoringFunction<OUT>
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public void open(Configuration parameters) throws Exception {
-		LOG.info("Opening File Monitoring Source.");
-
 		super.open(parameters);
 		format.configure(parameters);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Opened File Monitoring Source for path: " + path + ".");
+		}
 	}
 
 	@Override
-	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
+	public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
 		FileSystem fileSystem = FileSystem.get(new URI(path));
 
 		checkpointLock = context.getCheckpointLock();
@@ -146,104 +154,61 @@ public class ContinuousFileMonitoringFunction<OUT>
 		}
 	}
 
-	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
-		assert (Thread.holdsLock(checkpointLock));
-
-		List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = getInputSplitSortedOnModTime(fs);
-
-		Iterator<Tuple2<Long, List<FileInputSplit>>> it = splitsByModTime.iterator();
-		while (it.hasNext()) {
-			forwardSplits(it.next(), context);
-			it.remove();
-		}
-	}
-
-	private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
+	private void monitorDirAndForwardSplits(FileSystem fs,
+											SourceContext<TimestampedFileInputSplit> context) throws IOException {
 		assert (Thread.holdsLock(checkpointLock));
 
-		Long modTime = splitsToFwd.f0;
-		List<FileInputSplit> splits = splitsToFwd.f1;
+		Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs);
+		Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);
 
-		Iterator<FileInputSplit> it = splits.iterator();
-		while (it.hasNext()) {
-			FileInputSplit split = it.next();
-			processSplit(split, context);
-			it.remove();
-		}
-
-		// update the global modification time
-		if (modTime >= globalModificationTime) {
-			globalModificationTime = modTime;
-		}
-	}
-
-	private void processSplit(FileInputSplit split, SourceContext<FileInputSplit> context) {
-		LOG.info("Forwarding split: " + split);
-		context.collect(split);
-	}
-
-	private List<Tuple2<Long, List<FileInputSplit>>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
-		List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
-		if (eligibleFiles.isEmpty()) {
-			return new ArrayList<>();
-		}
-
-		Map<Long, List<FileInputSplit>> splitsToForward = getInputSplits(eligibleFiles);
-		List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward = new ArrayList<>();
-
-		for (Map.Entry<Long, List<FileInputSplit>> entry : splitsToForward.entrySet()) {
-			sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), entry.getValue()));
-		}
-
-		Collections.sort(sortedSplitsToForward, new Comparator<Tuple2<Long, List<FileInputSplit>>>() {
-			@Override
-			public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<FileInputSplit>> o2) {
-				return (int) (o1.f0 - o2.f0);
+		for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
+			long modificationTime = splits.getKey();
+			for (TimestampedFileInputSplit split: splits.getValue()) {
+				LOG.info("Forwarding split: " + split);
+				context.collect(split);
 			}
-		});
-
-		return sortedSplitsToForward;
+			// update the global modification time
+			globalModificationTime = Math.max(globalModificationTime, modificationTime);
+		}
 	}
 
 	/**
-	 * Creates the input splits for the path to be forwarded to the downstream tasks of the
-	 * {@link ContinuousFileReaderOperator}. Those tasks are going to read their contents for further
-	 * processing. Splits belonging to files in the {@code eligibleFiles} list are the ones
-	 * that are shipped for further processing.
+	 * Creates the input splits to be forwarded to the downstream tasks of the
+	 * {@link ContinuousFileReaderOperator}. Splits are sorted <b>by modification time</b> before
+	 * being forwarded and only splits belonging to files in the {@code eligibleFiles}
+	 * list will be processed.
 	 * @param eligibleFiles The files to process.
 	 */
-	private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> eligibleFiles) throws IOException {
+	private Map<Long, List<TimestampedFileInputSplit>> getInputSplitsSortedByModTime(
+				Map<Path, FileStatus> eligibleFiles) throws IOException {
+
+		Map<Long, List<TimestampedFileInputSplit>> splitsByModTime = new TreeMap<>();
 		if (eligibleFiles.isEmpty()) {
-			return new HashMap<>();
+			return splitsByModTime;
 		}
 
-		FileInputSplit[] inputSplits = format.createInputSplits(readerParallelism);
-
-		Map<Long, List<FileInputSplit>> splitsPerFile = new HashMap<>();
-		for (FileInputSplit split: inputSplits) {
-			for (FileStatus file: eligibleFiles) {
-				if (file.getPath().equals(split.getPath())) {
-					Long modTime = file.getModificationTime();
-
-					List<FileInputSplit> splitsToForward = splitsPerFile.get(modTime);
-					if (splitsToForward == null) {
-						splitsToForward = new LinkedList<>();
-						splitsPerFile.put(modTime, splitsToForward);
-					}
-					splitsToForward.add(split);
-					break;
+		for (FileInputSplit split: format.createInputSplits(readerParallelism)) {
+			FileStatus fileStatus = eligibleFiles.get(split.getPath());
+			if (fileStatus != null) {
+				Long modTime = fileStatus.getModificationTime();
+				List<TimestampedFileInputSplit> splitsToForward = splitsByModTime.get(modTime);
+				if (splitsToForward == null) {
+					splitsToForward = new ArrayList<>();
+					splitsByModTime.put(modTime, splitsToForward);
 				}
+				splitsToForward.add(new TimestampedFileInputSplit(
+					modTime, split.getSplitNumber(), split.getPath(),
+					split.getStart(), split.getLength(), split.getHostnames()));
 			}
 		}
-		return splitsPerFile;
+		return splitsByModTime;
 	}
 
 	/**
-	 * Returns the files that have data to be processed. This method returns the
-	 * Paths to the aforementioned files. It is up to the {@link #processSplit(FileInputSplit, SourceContext)}
-	 * method to decide which parts of the file to be processed, and forward them downstream.
+	 * Returns the paths of the files not yet processed.
+	 * @param fileSystem The filesystem where the monitored directory resides.
 	 */
-	private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
+	private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
 
 		final FileStatus[] statuses;
 		try {
@@ -251,20 +216,20 @@ public class ContinuousFileMonitoringFunction<OUT>
 		} catch (IOException e) {
 			// we may run into an IOException if files are moved while listing their status
 			// delay the check for eligible files in this case
-			return Collections.emptyList();
+			return Collections.emptyMap();
 		}
 
 		if (statuses == null) {
 			LOG.warn("Path does not exist: {}", path);
-			return Collections.emptyList();
+			return Collections.emptyMap();
 		} else {
-			List<FileStatus> files = new ArrayList<>();
+			Map<Path, FileStatus> files = new HashMap<>();
 			// handle the new files
 			for (FileStatus status : statuses) {
 				Path filePath = status.getPath();
 				long modificationTime = status.getModificationTime();
 				if (!shouldIgnore(filePath, modificationTime)) {
-					files.add(status);
+					files.put(filePath, status);
 				}
 			}
 			return files;
@@ -273,19 +238,19 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	/**
 	 * Returns {@code true} if the file is NOT to be processed further.
-	 * This happens in the following cases:
-	 *
-	 * If the user-specified path filtering method returns {@code true} for the file,
-	 * or if the modification time of the file is smaller than the {@link #globalModificationTime}, which
-	 * is the time of the most recent modification found in any of the already processed files.
+	 * This happens if the modification time of the file is smaller than
+	 * the {@link #globalModificationTime}.
+	 * @param filePath the path of the file to check.
+	 * @param modificationTime the modification time of the file.
 	 */
 	private boolean shouldIgnore(Path filePath, long modificationTime) {
 		assert (Thread.holdsLock(checkpointLock));
 		boolean shouldIgnore = modificationTime <= globalModificationTime;
-		if (shouldIgnore) {
-			LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
+		if (shouldIgnore && LOG.isDebugEnabled()) {
+			LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime +
+				" and global mod time= " + globalModificationTime);
 		}
-		return  shouldIgnore;
+		return shouldIgnore;
 	}
 
 	@Override
@@ -295,7 +260,10 @@ public class ContinuousFileMonitoringFunction<OUT>
 			globalModificationTime = Long.MAX_VALUE;
 			isRunning = false;
 		}
-		LOG.info("Closed File Monitoring Source.");
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Closed File Monitoring Source for path: " + path + ".");
+		}
 	}
 
 	@Override
@@ -316,7 +284,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	@Override
 	public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		return globalModificationTime;
+		return this.globalModificationTime;
 	}
 
 	@Override


Mime
View raw message