flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3956] Make FileInputFormats independent from Configuration
Date Wed, 20 Jul 2016 16:01:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6ab96a7bb -> 25b6f2249


[FLINK-3956] Make FileInputFormats independent from Configuration

Parameters of some input formats that was only possible to be
set through the Configuration object now have setter methods
that allow the user to do so.

Values set by the setters cannot be reset by the configuration
object.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25b6f224
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25b6f224
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25b6f224

Branch: refs/heads/master
Commit: 25b6f22490550f0e1a6defef6148c35b69c009de
Parents: 6ab96a7
Author: kl0u <kkloudas@gmail.com>
Authored: Mon Jul 18 18:18:43 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Jul 20 17:59:35 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/io/BinaryInputFormat.java  | 23 ++++++--
 .../api/common/io/DelimitedInputFormat.java     | 38 ++++++++-----
 .../flink/api/common/io/FileInputFormat.java    | 53 ++++++++++-------
 .../api/common/io/BinaryInputFormatTest.java    |  5 +-
 .../api/common/io/SerializedFormatTest.java     |  2 +-
 .../flink/api/java/io/TextInputFormatTest.java  | 60 +++++++++++++++++++-
 .../api/java/io/TypeSerializerFormatTest.java   |  2 +-
 .../environment/StreamExecutionEnvironment.java | 15 +++--
 8 files changed, 144 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 14280d9..d45a767 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -86,14 +86,27 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
 
-		// read own parameters
-		this.blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE);
-		if (this.blockSize < 1 && this.blockSize != NATIVE_BLOCK_SIZE) {
+		// the if is to prevent the configure() method from
+		// overwriting the value set by the setter
+
+		if (this.blockSize == NATIVE_BLOCK_SIZE) {
+			long blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE);
+			setBlockSize(blockSize);
+		}
+	}
+
+	public void setBlockSize(long blockSize) {
+		if (blockSize < 1 && blockSize != NATIVE_BLOCK_SIZE) {
 			throw new IllegalArgumentException("The block size parameter must be set and larger than
0.");
 		}
-		if (this.blockSize > Integer.MAX_VALUE) {
-			throw new UnsupportedOperationException("Currently only block size up to Integer.MAX_VALUE
are supported");
+		if (blockSize > Integer.MAX_VALUE) {
+			throw new UnsupportedOperationException("Currently only block sizes up to Integer.MAX_VALUE
are supported");
 		}
+		this.blockSize = blockSize;
+	}
+
+	public long getBlockSize() {
+		return this.blockSize;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 59c1730..99aa022 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.fs.Path;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
  * Base implementation for input formats that split the input at a delimiter into records.
@@ -181,7 +182,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
imple
 		if (delimiter == null) {
 			throw new IllegalArgumentException("Delimiter must not be null");
 		}
-		
 		this.delimiter = delimiter;
 	}
 
@@ -190,6 +190,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
imple
 	}
 	
 	public void setDelimiter(String delimiter) {
+		if (delimiter == null) {
+			throw new IllegalArgumentException("Delimiter must not be null");
+		}
 		this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
 	}
 	
@@ -225,7 +228,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
imple
 		if (numLineSamples < 0) {
 			throw new IllegalArgumentException("Number of line samples must not be negative.");
 		}
-		
 		this.numLineSamples = numLineSamples;
 	}
 
@@ -260,23 +262,29 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
imple
 	@Override
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
-		
-		String delimString = parameters.getString(RECORD_DELIMITER, null);
-		if (delimString != null) {
-			setDelimiter(delimString);
+
+		// the if() clauses are to prevent the configure() method from
+		// overwriting the values set by the setters
+
+		if (Arrays.equals(delimiter, new byte[] {'\n'})) {
+			String delimString = parameters.getString(RECORD_DELIMITER, null);
+			if (delimString != null) {
+				setDelimiter(delimString);
+			}
 		}
 		
 		// set the number of samples
-		String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null);
-		if (samplesString != null) {
-			try {
-				setNumLineSamples(Integer.parseInt(samplesString));
-			}
-			catch (NumberFormatException e) {
-				if (LOG.isWarnEnabled()) {
-					LOG.warn("Invalid value for number of samples to take: " + samplesString + ". Skipping
sampling.");
+		if (numLineSamples == NUM_SAMPLES_UNDEFINED) {
+			String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null);
+			if (samplesString != null) {
+				try {
+					setNumLineSamples(Integer.parseInt(samplesString));
+				} catch (NumberFormatException e) {
+					if (LOG.isWarnEnabled()) {
+						LOG.warn("Invalid value for number of samples to take: " + samplesString + ". Skipping
sampling.");
+					}
+					setNumLineSamples(0);
 				}
-				setNumLineSamples(0);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index fd69cc3..95a1ffa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -242,27 +242,31 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT,
FileInputS
 
 	public void setFilePath(String filePath) {
 		if (filePath == null) {
-			throw new IllegalArgumentException("File path may not be null.");
+			throw new IllegalArgumentException("File path cannot be null.");
 		}
-		
+
 		// TODO The job-submission web interface passes empty args (and thus empty
 		// paths) to compute the preview graph. The following is a workaround for
 		// this situation and we should fix this.
-		
+
 		// comment (Stephan Ewen) this should be no longer relevant with the current Java/Scalal
APIs.
 		if (filePath.isEmpty()) {
 			setFilePath(new Path());
 			return;
 		}
-		
-		setFilePath(new Path(filePath));
+
+		try {
+			this.filePath = new Path(filePath);
+		} catch (RuntimeException rex) {
+			throw new RuntimeException("Could not create a valid URI from the given file path name:
" + rex.getMessage());
+		}
 	}
 	
 	public void setFilePath(Path filePath) {
 		if (filePath == null) {
-			throw new IllegalArgumentException("File path may not be null.");
+			throw new IllegalArgumentException("File path must not be null.");
 		}
-		
+
 		this.filePath = filePath;
 	}
 	
@@ -274,7 +278,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT,
FileInputS
 		if (minSplitSize < 0) {
 			throw new IllegalArgumentException("The minimum split size cannot be negative.");
 		}
-		
+
 		this.minSplitSize = minSplitSize;
 	}
 	
@@ -301,6 +305,14 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT,
FileInputS
 		this.openTimeout = openTimeout;
 	}
 
+	public void setNestedFileEnumeration(boolean enable) {
+		this.enumerateNestedFiles = enable;
+	}
+
+	public boolean getNestedFileEnumeration() {
+		return this.enumerateNestedFiles;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Getting information about the split that is currently open
 	// --------------------------------------------------------------------------------------------
@@ -334,23 +346,20 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT,
FileInputS
 	 */
 	@Override
 	public void configure(Configuration parameters) {
-		// get the file path
-		String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
-		if (filePath != null) {
-			try {
-				this.filePath = new Path(filePath);
-			}
-			catch (RuntimeException rex) {
-				throw new RuntimeException("Could not create a valid URI from the given file path name:
" + rex.getMessage()); 
-			}
+
+		// the if() clauses are to prevent the configure() method from
+		// overwriting the values set by the setters
+
+		if (filePath == null) {
+			String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
+			setFilePath(filePath);
 		}
-		else if (this.filePath == null) {
-			throw new IllegalArgumentException("File path was not specified in input format, or configuration.");

+
+		if (!this.enumerateNestedFiles) {
+			this.enumerateNestedFiles = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false);
 		}
-		
-		this.enumerateNestedFiles = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false);
 	}
-	
+
 	/**
 	 * Obtains basic file statistics containing only file size. If the input is a directory,
then the size is the sum of all contained files.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index e816795..90b366c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -57,10 +57,11 @@ public class BinaryInputFormatTest {
 		fileOutputStream.close();
 
 		final Configuration config = new Configuration();
-		config.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, blockSize);
-		
+		config.setLong("input.block_size", blockSize + 10);
+
 		final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat();
 		inputFormat.setFilePath(tempFile.toURI().toString());
+		inputFormat.setBlockSize(blockSize);
 		
 		inputFormat.configure(config);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
index db2ee8c..ac1e19c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -49,10 +49,10 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record>
{
 	@Override
 	protected BinaryInputFormat<Record> createInputFormat() {
 		Configuration configuration = new Configuration();
-		configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
 
 		final SerializedInputFormat<Record> inputFormat = new SerializedInputFormat<Record>();
 		inputFormat.setFilePath(this.tempFile.toURI().toString());
+		inputFormat.setBlockSize(this.blockSize);
 
 		inputFormat.configure(configuration);
 		return inputFormat;

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 62a7cf5..3cb7505 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -30,6 +30,9 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -83,9 +86,62 @@ public class TextInputFormatTest {
 			fail("Test erroneous");
 		}
 	}
-	
+
+	@Test
+	public void testNestedFileRead() {
+		String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
+		List<String> expectedFiles = new ArrayList<>();
+
+		try {
+			for (String dir: dirs) {
+				// create input file
+				File tmpDir = new File(dir);
+				if (!tmpDir.exists()) {
+					tmpDir.mkdirs();
+				}
+
+				File tempFile = File.createTempFile("TextInputFormatTest", ".tmp", tmpDir);
+				tempFile.deleteOnExit();
+
+				expectedFiles.add("file:" + tempFile.getAbsolutePath());
+			}
+			File parentDir = new File("tmp");
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI().toString()));
+			inputFormat.setNestedFileEnumeration(true);
+			inputFormat.setNumLineSamples(10);
+
+			// this is to check if the setter overrides the configuration (as expected)
+			Configuration config = new Configuration();
+			config.setBoolean("recursive.file.enumeration", false);
+			config.setString("delimited-format.numSamples", "20");
+			inputFormat.configure(config);
+
+			assertTrue(inputFormat.getNestedFileEnumeration());
+			assertTrue(inputFormat.getNumLineSamples() == 10);
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(expectedFiles.size());
+
+			List<String> paths = new ArrayList<>();
+			for (FileInputSplit split: splits) {
+				paths.add(split.getPath().toString());
+			}
+
+			Collections.sort(expectedFiles);
+			Collections.sort(paths);
+			for (int i = 0; i < expectedFiles.size(); i++) {
+				assertTrue(expectedFiles.get(i).equals(paths.get(i)));
+			}
+
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
+
 	/**
-	 * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should
be removed 
+	 * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should
be removed
 	 */
 	@Test
 	public void testRemovingTrailingCR() {

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index 35c564b..a119d59 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -63,11 +63,11 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
 	@Override
 	protected BinaryInputFormat<Tuple2<Integer, String>> createInputFormat() {
 		Configuration configuration = new Configuration();
-		configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
 
 		final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new
 				TypeSerializerInputFormat<Tuple2<Integer, String>>(resultType);
 		inputFormat.setFilePath(this.tempFile.toURI().toString());
+		inputFormat.setBlockSize(this.blockSize);
 
 		inputFormat.configure(configuration);
 		return inputFormat;

http://git-wip-us.apache.org/repos/asf/flink/blob/25b6f224/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a2a38d4..53053b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -913,7 +913,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The data stream that represents the data read from the given file as text lines
 	 */
 	public DataStreamSource<String> readTextFile(String filePath, String charsetName)
{
-		Preconditions.checkNotNull(filePath, "The file path may not be null.");
+		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
 
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
@@ -1076,6 +1077,7 @@ public abstract class StreamExecutionEnvironment {
 
 		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
 		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
 
 		inputFormat.setFilePath(filePath);
 		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter,
interval);
@@ -1222,23 +1224,24 @@ public abstract class StreamExecutionEnvironment {
 	private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT>
inputFormat,
 														TypeInformation<OUT> typeInfo,
 														String sourceName,
-														FileProcessingMode watchType,
+														FileProcessingMode monitoringMode,
 														FilePathFilter pathFilter,
 														long interval) {
 
 		Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
 		Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
 		Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
-		Preconditions.checkNotNull(watchType, "Unspecified watchtype.");
+		Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
 		Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function.");
 
-		Preconditions.checkArgument(watchType.equals(FileProcessingMode.PROCESS_ONCE) ||
+		Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
 			interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
-			"The path monitoring interval cannot be less than 100 ms.");
+			"The path monitoring interval cannot be less than " +
+				ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
 
 		ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
 			inputFormat, inputFormat.getFilePath().toString(),
-			pathFilter, watchType, getParallelism(), interval);
+			pathFilter, monitoringMode, getParallelism(), interval);
 
 		ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat);
 


Mime
View raw message