flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [3/3] flink git commit: [FLINK-1081] FileMonitoringFunction fix for proper handling of modification times
Date Sun, 25 Jan 2015 21:11:13 GMT
[FLINK-1081] FileMonitoringFunction fix for proper handling of modification times

Closes #226


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b63f269
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b63f269
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b63f269

Branch: refs/heads/master
Commit: 9b63f269e7ced1c9cb58bffe0091eb3d86ac6624
Parents: bf5e39a
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun Jan 25 20:20:40 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sun Jan 25 21:26:54 2015 +0100

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  9 ++---
 .../function/source/FileMonitoringFunction.java | 36 ++++++++++++--------
 .../api/scala/StreamExecutionEnvironment.scala  | 15 ++++----
 3 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b7eeed2..e0cdd02 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -224,8 +224,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @param filePath
 	 *            The path of the file, as a URI (e.g.,
 	 *            "file:///some/local/file" or "hdfs://host:port/file/path/").
-	 * @param interval
-	 *            The interval of file watching.
+	 * @param intervalMillis
+	 *            The interval of file watching in milliseconds.
 	 * @param watchType
 	 *            The watch type of file stream. When watchType is
 	 *            {@link WatchType.ONLY_NEW_FILES}, the system processes only
@@ -236,9 +236,10 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @return The DataStream containing the given directory.
 	 */
-	public DataStream<String> readFileStream(String filePath, long interval, WatchType
watchType) {
+	public DataStream<String> readFileStream(String filePath, long intervalMillis,
+			WatchType watchType) {
 		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-				filePath, interval, watchType));
+				filePath, intervalMillis, watchType), null, "File Stream");
 
 		return source.flatMap(new FileReadFunction());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
index c223c53..05a2489 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
@@ -48,15 +48,14 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String,
Lon
 	private WatchType watchType;
 
 	private FileSystem fileSystem;
-	private long lastModificationTime;
 	private Map<String, Long> offsetOfFiles;
+	private Map<String, Long> modificationTimes;
 
 	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
 		this.path = path;
 		this.interval = interval;
 		this.watchType = watchType;
-
-		this.lastModificationTime = System.currentTimeMillis();
+		this.modificationTimes = new HashMap<String, Long>();
 		this.offsetOfFiles = new HashMap<String, Long>();
 	}
 
@@ -67,7 +66,8 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String,
Lon
 		while (true) {
 			List<String> files = listNewFiles();
 			for (String filePath : files) {
-				if (watchType == WatchType.ONLY_NEW_FILES || watchType == WatchType.REPROCESS_WITH_APPENDED)
{
+				if (watchType == WatchType.ONLY_NEW_FILES
+						|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
 					collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
 					offsetOfFiles.put(filePath, -1L);
 				} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
@@ -90,28 +90,34 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String,
Lon
 
 	private List<String> listNewFiles() throws IOException {
 		List<String> files = new ArrayList<String>();
+
 		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
 
 		for (FileStatus status : statuses) {
 			Path filePath = status.getPath();
+			String fileName = filePath.getName();
 			long modificationTime = status.getModificationTime();
 
-			if (!isFiltered(filePath, modificationTime)) {
+			if (!isFiltered(fileName, modificationTime)) {
 				files.add(filePath.toString());
+				modificationTimes.put(fileName, modificationTime);
 			}
 		}
-
-		lastModificationTime = System.currentTimeMillis();
-
 		return files;
 	}
 
-	private boolean isFiltered(Path path, long modificationTime) {
-		String filename = path.getName();
-
-		return lastModificationTime > modificationTime // not modified file
-				|| (watchType == WatchType.ONLY_NEW_FILES && offsetOfFiles.containsKey(path.toString()))
// modified file but already processed
-				|| filename.startsWith(".") // hidden file
-				|| filename.contains("_COPYING_"); // currently copying file
+	private boolean isFiltered(String fileName, long modificationTime) {
+
+		if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
+				|| fileName.startsWith(".") || fileName.contains("_COPYING_")) {
+			return true;
+		} else {
+			Long lastModification = modificationTimes.get(fileName);
+			if (lastModification == null) {
+				return false;
+			} else {
+				return lastModification >= modificationTime;
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b63f269/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index b4565c7..e0f50f8 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment
=>
 import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction
}
 import org.apache.flink.util.Collector
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -81,14 +82,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.readTextFile(filePath)
 
   /**
-   * Creates a DataStream that represents the Strings produced by reading the
-   * given file line wise multiple times(infinite). The file will be read with
-   * the system's default character set. This functionality can be used for
-   * testing a topology.
+   * Creates a DataStream that contains the contents of file created while
+   * system watches the given path. The file will be read with the system's
+   * default character set. The user can check the monitoring interval in milliseconds,
+   * and the the way file modifications are handled. By default it checks for only new files
+   * every 100 milliseconds.
    *
    */
-  def readTextStream(StreamPath: String): DataStream[String] = 
-    javaEnv.readTextStream(StreamPath)
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType
= 
+    WatchType.ONLY_NEW_FILES): DataStream[String] =
+    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
 
   /**
    * Creates a new DataStream that contains the strings received infinitely


Mime
View raw message