flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [FLINK-4890] [core] Make GlobFilePathFilter work on Windows
Date Thu, 05 Jan 2017 19:38:20 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 9c0c19aae -> 700cbd464


[FLINK-4890] [core] Make GlobFilePathFilter work on Windows


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb48c3b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb48c3b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb48c3b4

Branch: refs/heads/release-1.2
Commit: fb48c3b4cbc5a186cb7b812c8d05833c5852b385
Parents: 91f9a1a
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Jan 5 14:44:00 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 5 17:34:13 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FilePathFilter.java     | 31 ++++++++--
 .../flink/api/common/io/GlobFilePathFilter.java | 24 +++++---
 .../java/org/apache/flink/core/fs/Path.java     | 61 ++++++++++++--------
 .../api/common/io/FileInputFormatTest.java      | 41 ++++++-------
 4 files changed, 94 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb48c3b4/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
index 4ab896c..0979096 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
@@ -29,12 +29,10 @@ import java.io.Serializable;
 @PublicEvolving
 public abstract class FilePathFilter implements Serializable {
 
-	// Name of an unfinished Hadoop file
-	public static final String HADOOP_COPYING = "_COPYING_";
+	private static final long serialVersionUID = 1L;
 
-	public static FilePathFilter createDefaultFilter() {
-		return new DefaultFilter();
-	}
+	/** Name of an unfinished Hadoop file */
+	public static final String HADOOP_COPYING = "_COPYING_";
 
 	/**
 	 * Returns {@code true} if the {@code filePath} given is to be
@@ -50,12 +48,35 @@ public abstract class FilePathFilter implements Serializable {
 	public abstract boolean filterPath(Path filePath);
 
 	/**
+	 * Returns the default filter, which excludes the following files:
+	 * 
+	 * <ul>
+	 *     <li>Files starting with &quot;_&quot;</li>
+	 *     <li>Files starting with &quot;.&quot;</li>
+	 *     <li>Files containing the string &quot;_COPYING_&quot;</li>
+	 * </ul>
+	 * 
+	 * @return The singleton instance of the default file path filter.
+	 */
+	public static FilePathFilter createDefaultFilter() {
+		return DefaultFilter.INSTANCE;
+	}
+
+	// ------------------------------------------------------------------------
+	//  The default filter
+	// ------------------------------------------------------------------------
+
+	/**
 	 * The default file path filtering method and is used
 	 * if no other such function is provided. This filter leaves out
 	 * files starting with ".", "_", and "_COPYING_".
 	 */
 	public static class DefaultFilter extends FilePathFilter {
 
+		private static final long serialVersionUID = 1L;
+
+		static final DefaultFilter INSTANCE = new DefaultFilter();
+
 		DefaultFilter() {}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48c3b4/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
index a3a78ae..0ee6f03 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -52,8 +52,8 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 	private static final long serialVersionUID = 1L;
 
-	private final List<PathMatcher> includeMatchers;
-	private final List<PathMatcher> excludeMatchers;
+	private final ArrayList<PathMatcher> includeMatchers;
+	private final ArrayList<PathMatcher> excludeMatchers;
 
 	/**
 	 * Constructor for GlobFilePathFilter that will match all files
@@ -73,9 +73,9 @@ public class GlobFilePathFilter extends FilePathFilter {
 		excludeMatchers = buildPatterns(excludePatterns);
 	}
 
-	private List<PathMatcher> buildPatterns(List<String> patterns) {
+	private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {
 		FileSystem fileSystem = FileSystems.getDefault();
-		List<PathMatcher> matchers = new ArrayList<>();
+		ArrayList<PathMatcher> matchers = new ArrayList<>(patterns.size());
 
 		for (String patternStr : patterns) {
 			matchers.add(fileSystem.getPathMatcher("glob:" + patternStr));
@@ -90,22 +90,28 @@ public class GlobFilePathFilter extends FilePathFilter {
 			return false;
 		}
 
+		// compensate for the fact that Flink paths are slashed
+		final String path = filePath.hasWindowsDrive() ?
+				filePath.getPath().substring(1) :
+				filePath.getPath();
+
+		final java.nio.file.Path nioPath = Paths.get(path);
+
 		for (PathMatcher matcher : includeMatchers) {
-			if (matcher.matches(Paths.get(filePath.getPath()))) {
-				return shouldExclude(filePath);
+			if (matcher.matches(nioPath)) {
+				return shouldExclude(nioPath);
 			}
 		}
 
 		return true;
 	}
 
-	private boolean shouldExclude(Path filePath) {
+	private boolean shouldExclude(java.nio.file.Path nioPath) {
 		for (PathMatcher matcher : excludeMatchers) {
-			if (matcher.matches(Paths.get(filePath.getPath()))) {
+			if (matcher.matches(nioPath)) {
 				return true;
 			}
 		}
 		return false;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48c3b4/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 7adfa42..53290ed 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -274,24 +274,6 @@ public class Path implements IOReadableWritable, Serializable {
 	}
 
 	/**
-	 * Checks if the provided path string contains a windows drive letter.
-	 * 
-	 * @param path
-	 *        the path to check
-	 * @param slashed
-	 *        <code>true</code> to indicate the first character of the string is
a slash, <code>false</code> otherwise
-	 * @return <code>true</code> if the path string contains a windows drive letter,
<code>false</code> otherwise
-	 */
-	private boolean hasWindowsDrive(String path, boolean slashed) {
-		final int start = slashed ? 1 : 0;
-		return path.length() >= start + 2
-			&& (!slashed || path.charAt(0) == '/')
-			&& path.charAt(start + 1) == ':'
-			&& ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') ||
(path.charAt(start) >= 'a' && path
-				.charAt(start) <= 'z'));
-	}
-
-	/**
 	 * Converts the path object to a {@link URI}.
 	 * 
 	 * @return the {@link URI} object converted from the path object
@@ -376,8 +358,7 @@ public class Path implements IOReadableWritable, Serializable {
 
 	@Override
 	public String toString() {
-		// we can't use uri.toString(), which escapes everything, because we
-		// want
+		// we can't use uri.toString(), which escapes everything, because we want
 		// illegal characters unescaped in the string, for glob processing, etc.
 		final StringBuilder buffer = new StringBuilder();
 		if (uri.getScheme() != null) {
@@ -390,9 +371,7 @@ public class Path implements IOReadableWritable, Serializable {
 		}
 		if (uri.getPath() != null) {
 			String path = uri.getPath();
-			if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
-				// windows
-				// drive
+			if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has windows
drive
 				uri.getScheme() == null && // but no scheme
 				uri.getAuthority() == null) { // or authority
 				path = path.substring(1); // remove slash before drive
@@ -476,10 +455,12 @@ public class Path implements IOReadableWritable, Serializable {
 		return new Path(scheme + ":" + "//" + authority + pathUri.getPath());
 	}
 
+	// ------------------------------------------------------------------------
+	//  Legacy Serialization
+	// ------------------------------------------------------------------------
 
 	@Override
 	public void read(DataInputView in) throws IOException {
-
 		final boolean isNotNull = in.readBoolean();
 		if (isNotNull) {
 			final String scheme = StringUtils.readNullableString(in);
@@ -501,7 +482,6 @@ public class Path implements IOReadableWritable, Serializable {
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
-
 		if (uri == null) {
 			out.writeBoolean(false);
 		} else {
@@ -514,6 +494,37 @@ public class Path implements IOReadableWritable, Serializable {
 			StringUtils.writeNullableString(uri.getQuery(), out);
 			StringUtils.writeNullableString(uri.getFragment(), out);
 		}
+	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if the provided path string contains a windows drive letter.
+	 *
+	 * @return True, if the path string contains a windows drive letter, false otherwise.
+	 */
+	public boolean hasWindowsDrive() {
+		return hasWindowsDrive(uri.getPath(), true);
+	}
+
+	/**
+	 * Checks if the provided path string contains a windows drive letter.
+	 *
+	 * @param path
+	 *        the path to check
+	 * @param slashed
+	 *         true to indicate the first character of the string is a slash, false otherwise
+	 * 
+	 * @return <code>true</code> if the path string contains a windows drive letter,
false otherwise
+	 */
+	private boolean hasWindowsDrive(String path, boolean slashed) {
+		final int start = slashed ? 1 : 0;
+		return path.length() >= start + 2
+				&& (!slashed || path.charAt(0) == '/')
+				&& path.charAt(start + 1) == ':'
+				&& ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') ||
(path.charAt(start) >= 'a' && path
+				.charAt(start) <= 'z'));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48c3b4/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 3e5d309..5599dd0 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -338,36 +338,29 @@ public class FileInputFormatTest {
 	}
 
 	@Test
-	public void testReadMultiplePatterns() {
-		try {
-			final String contents = "CONTENTS";
+	public void testReadMultiplePatterns() throws Exception {
+		final String contents = "CONTENTS";
 
-			// create some accepted, some ignored files
+		// create some accepted, some ignored files
 
-			File child1 = temporaryFolder.newFile("dataFile1.txt");
-			File child2 = temporaryFolder.newFile("another_file.bin");
-			createTempFiles(contents.getBytes(), child1, child2);
+		File child1 = temporaryFolder.newFile("dataFile1.txt");
+		File child2 = temporaryFolder.newFile("another_file.bin");
+		createTempFiles(contents.getBytes(), child1, child2);
 
-			// test that only the valid files are accepted
+		// test that only the valid files are accepted
 
-			Configuration configuration = new Configuration();
+		Configuration configuration = new Configuration();
 
-			final DummyFileInputFormat format = new DummyFileInputFormat();
-			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
-			format.configure(configuration);
-			format.setFilesFilter(new GlobFilePathFilter(
-				Collections.singletonList("**"),
-				Arrays.asList(new String[] {"**/another_file.bin", "**/dataFile1.txt"})
-			));
-			FileInputSplit[] splits = format.createInputSplits(1);
+		final DummyFileInputFormat format = new DummyFileInputFormat();
+		format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+		format.configure(configuration);
+		format.setFilesFilter(new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			Arrays.asList("**/another_file.bin", "**/dataFile1.txt")
+		));
+		FileInputSplit[] splits = format.createInputSplits(1);
 
-			Assert.assertEquals(0, splits.length);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
+		Assert.assertEquals(0, splits.length);
 	}
 
 	@Test


Mime
View raw message