flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-1389] Allow changing the filenames of the files created when writing to a directory
Date Mon, 26 Jan 2015 10:30:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master ad31f6111 -> 268ff7a06


[FLINK-1389] Allow changing the filenames of the files created when writing to a directory

This closes #301


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/268ff7a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/268ff7a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/268ff7a0

Branch: refs/heads/master
Commit: 268ff7a067bfa0e4985358afe7f7091a6abbd3c4
Parents: ad31f61
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Jan 12 16:40:34 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Jan 26 11:29:36 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |  7 +++-
 .../flink/api/avro/AvroOutputFormatTest.java    |  5 +++
 flink-addons/flink-tachyon/pom.xml              |  6 +++
 .../java/org/apache/flink/tachyon/HDFSTest.java | 41 +++++++++++++++++++
 .../flink/api/common/io/FileOutputFormat.java   | 18 +++++---
 .../api/common/io/FileOutputFormatTest.java     | 43 +++++++++++++++++++-
 6 files changed, 111 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/268ff7a0/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 08fe8d9..d00dbf7 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -41,7 +41,6 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E>
{
 
 	private transient DataFileWriter<E> dataFileWriter;
 
-
 	public AvroOutputFormat(Path filePath, Class<E> type) {
 		super(filePath);
 		this.avroValueType = type;
@@ -51,6 +50,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E>
{
 		this.avroValueType = type;
 	}
 
+	@Override
+	protected String getDirectoryFileName(int taskNumber) {
+		return super.getDirectoryFileName(taskNumber) + ".avro";
+	}
+
 	public void setSchema(Schema schema) {
 		this.userDefinedSchema = schema;
 	}
@@ -63,6 +67,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E>
{
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
 		super.open(taskNumber, numTasks);
+
 		DatumWriter<E> datumWriter;
 		Schema schema = null;
 		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
{

http://git-wip-us.apache.org/repos/asf/flink/blob/268ff7a0/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index db235c0..42c1702 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -84,12 +84,17 @@ public class AvroOutputFormatTest extends JavaProgramTestBase {
 		File file1 = asFile(outputPath1);
 		if (file1.isDirectory()) {
 			output1 = file1.listFiles();
+			// check for avro ext in dir.
+			for (File avroOutput : output1) {
+				Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
+			}
 		} else {
 			output1 = new File[] {file1};
 		}
 		List<String> result1 = new ArrayList<String>();
 		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
 		for (File avroOutput : output1) {
+
 			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput,
userDatumReader1);
 			while (dataFileReader1.hasNext()) {
 				User user = dataFileReader1.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/268ff7a0/flink-addons/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/pom.xml b/flink-addons/flink-tachyon/pom.xml
index 82eaa1d..de36546 100644
--- a/flink-addons/flink-tachyon/pom.xml
+++ b/flink-addons/flink-tachyon/pom.xml
@@ -49,6 +49,12 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
 			<groupId>org.tachyonproject</groupId>
 			<artifactId>tachyon</artifactId>
 			<version>0.5.0</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/268ff7a0/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 8d29ea7..7318894 100644
--- a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -18,13 +18,19 @@
 
 package org.apache.flink.tachyon;
 
+
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
@@ -116,4 +122,39 @@ public class HDFSTest {
 			Assert.fail("Error in test: " + e.getMessage() );
 		}
 	}
+
+	@Test
+	public void testAvroOut() {
+		String type = "one";
+		AvroOutputFormat<String> avroOut =
+				new AvroOutputFormat<String>( String.class );
+
+		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+
+		avroOut.setOutputFilePath(new Path(result.toString()));
+		avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+		avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+
+		try {
+			avroOut.open(0, 2);
+			avroOut.writeRecord(type);
+			avroOut.close();
+
+			avroOut.open(1, 2);
+			avroOut.writeRecord(type);
+			avroOut.close();
+
+
+			Assert.assertTrue("No result file present", hdfs.exists(result));
+			FileStatus[] files = hdfs.listStatus(result);
+			Assert.assertEquals(2, files.length);
+			for(FileStatus file : files) {
+				Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
+			}
+
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/268ff7a0/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 10610f8..08d0be2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -91,7 +91,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>,
Initiali
 	 * The key under which the name of the target path is stored in the configuration. 
 	 */
 	public static final String FILE_PARAMETER_KEY = "flink.output.file";
-	
+
 	/**
 	 * The path of the file to be written.
 	 */
@@ -106,7 +106,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>,
Initiali
 	 * The output directory mode
 	 */
 	private OutputDirectoryMode outputDirectoryMode;
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	/** The stream to which the data is written; */
@@ -163,7 +163,8 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>,
Initiali
 	public OutputDirectoryMode getOutputDirectoryMode() {
 		return this.outputDirectoryMode;
 	}
-	
+
+
 	// ----------------------------------------------------------------
 
 	@Override
@@ -235,10 +236,11 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>,
Initiali
 				}
 			}
 		}
-			
-			
+
+
+
 		// Suffix the path with the parallel instance index, if needed
-		this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS)
? p.suffix("/" + (taskNumber+1)) : p;
+		this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS)
? p.suffix("/" + getDirectoryFileName(taskNumber)) : p;
 
 		// create output file
 		this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
@@ -247,6 +249,10 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>,
Initiali
 		this.fileCreated = true;
 	}
 
+	protected String getDirectoryFileName(int taskNumber) {
+		return Integer.toString(taskNumber + 1);
+	}
+
 	@Override
 	public void close() throws IOException {
 		final FSDataOutputStream s = this.stream;

http://git-wip-us.apache.org/repos/asf/flink/blob/268ff7a0/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index 6cdf731..43ded56 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -144,8 +144,38 @@ public class FileOutputFormatTest {
 		Assert.assertTrue(!exception);
 		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
 		Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-		
+
+		// check custom file name inside directory if directory exists
+		(new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
+		dfof = new DummyFileOutputFormat();
+		dfof.setOutputFilePath(new Path(tmpFilePath));
+		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+		dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+		dfof.testFileName = true;
+		Configuration c = new Configuration();
+		dfof.configure(c);
+
+		exception = false;
+		try {
+			dfof.open(0, 1);
+			dfof.close();
+		} catch (Exception e) {
+			exception = true;
+		}
+		File customOutFile = new File(tmpOutPath.getAbsolutePath()+"/fancy-1-0.avro");
+		Assert.assertTrue(!exception);
+		Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isDirectory());
+		Assert.assertTrue(customOutFile.exists() && customOutFile.isFile());
+		customOutFile.delete();
+
 		// check fail if file in directory exists
+		// create file for test
+		customOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+		try {
+			customOutFile.createNewFile();
+		} catch (IOException e) {
+			Assert.fail("Error creating file");
+		}
 		dfof = new DummyFileOutputFormat();
 		dfof.setOutputFilePath(new Path(tmpFilePath));
 		dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -562,11 +592,20 @@ public class FileOutputFormatTest {
 	
 	public static class DummyFileOutputFormat extends FileOutputFormat<IntValue> {
 		private static final long serialVersionUID = 1L;
-
+		public boolean testFileName = false;
 		@Override
 		public void writeRecord(IntValue record) throws IOException {
 			// DO NOTHING
 		}
+
+		@Override
+		protected String getDirectoryFileName(int taskNumber) {
+			if(testFileName) {
+				return "fancy-"+(taskNumber+1)+"-"+taskNumber+".avro";
+			} else {
+				return super.getDirectoryFileName(taskNumber);
+			}
+		}
 	}
 	
 }


Mime
View raw message