flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] incubator-flink git commit: [FLINK-1139] Fixed HadoopOutputFormat to run with DOP > 1
Date Mon, 08 Dec 2014 09:30:15 GMT
[FLINK-1139] Fixed HadoopOutputFormat to run with DOP > 1

This closes #173


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/15f58bb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/15f58bb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/15f58bb2

Branch: refs/heads/master
Commit: 15f58bb23c657951e2de48e6436820a093b393e7
Parents: 63eeafc
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Oct 6 16:28:00 2014 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Dec 8 10:26:55 2014 +0100

----------------------------------------------------------------------
 .../mapred/HadoopOutputFormat.java              | 20 ++++++-
 .../mapreduce/HadoopOutputFormat.java           | 52 +++++++++++------
 .../mapreduce/example/WordCount.java            |  1 -
 .../mapred/HadoopMapredITCase.java              |  2 +-
 .../mapreduce/HadoopInputOutputITCase.java      |  3 +-
 .../common/operators/GenericDataSinkBase.java   | 12 ++++
 .../flink/test/util/AbstractTestBase.java       | 60 +++++++++++++++-----
 7 files changed, 113 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index 434b409..64c539b 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -40,11 +41,11 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K,
V>> {
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K,
V>>, FinalizeOnMaster {
 	
 	private static final long serialVersionUID = 1L;
 	
-	private JobConf jobConf;	
+	private JobConf jobConf;
 	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;	
 	private transient RecordWriter<K,V> recordWriter;	
 	private transient FileOutputCommitter fileOutputCommitter;
@@ -141,7 +142,20 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
 			this.fileOutputCommitter.commitTask(this.context);
 		}
-		this.fileOutputCommitter.commitJob(this.jobContext);
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		try {
+			JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+			FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+			
+			// finalize HDFS output format
+			fileOutputCommitter.commitJob(jobContext);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index fa8823b..402372c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -22,18 +22,17 @@ package org.apache.flink.hadoopcompatibility.mapreduce;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -41,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K,
V>> {
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K,
V>>, FinalizeOnMaster {
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 	private transient RecordWriter<K,V> recordWriter;
 	private transient FileOutputCommitter fileOutputCommitter;
 	private transient TaskAttemptContext context;
+	private transient int taskNumber;
 	
 	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat,
Job job) {
 		super();
@@ -95,6 +95,8 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 			throw new IOException("Task id too large.");
 		}
 		
+		this.taskNumber = taskNumber+1;
+		
 		// for hadoop 2.2
 		this.configuration.set("mapreduce.output.basename", "tmp");
 		
@@ -158,28 +160,42 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
 			this.fileOutputCommitter.commitTask(this.context);
 		}
-		this.fileOutputCommitter.commitJob(this.context);
-		
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
 		
-		// rename tmp-* files to final name
+		// rename tmp-file to final name
 		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
 		
-		final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
+		String taskNumberStr = Integer.toString(this.taskNumber);
+		String tmpFileTemplate = "tmp-r-00000";
+		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
 		
-		// isDirectory does not work in hadoop 1
-		if(fs.getFileStatus(outputPath).isDir()) {
-			FileStatus[] files = fs.listStatus(outputPath);
+		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+		}
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		JobContext jobContext;
+		TaskAttemptContext taskContext;
+		try {
+			
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")

+					+ Integer.toString(1) 
+					+ "_0");
 			
-			for(FileStatus f : files) {
-				Matcher m = p.matcher(f.getPath().getName());
-				if(m.matches()) {
-					int part = Integer.valueOf(m.group(2));
-					fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
-				}
-			}
+			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
+			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
 		}
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")),
taskContext);
+		
+		// finalize HDFS output format
+		this.fileOutputCommitter.commitJob(jobContext);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 53a6e6b..2b99fd2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -55,7 +55,6 @@ public class WordCount {
 		final String outputPath = args[1];
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
 		
 		// Set up the Hadoop Input Format
 		Job job = Job.getInstance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index f8c3bab..b6650d2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -36,7 +36,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 000dc58..7eee629 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -32,11 +32,12 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
+		this.setDegreeOfParallelism(4);
 	}
 	
 	@Override
 	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index ca55855..a0f367e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -22,6 +22,8 @@ package org.apache.flink.api.common.operators;
 import java.util.List;
 
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -292,13 +294,23 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing>
{
 	
 	protected void executeOnCollections(List<IN> inputData) throws Exception {
 		OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
+		
+		if(format instanceof InitializeOnMaster) {
+			((InitializeOnMaster)format).initializeGlobal(1);
+		}
+		
 		format.configure(this.parameters);
 		
 		format.open(0, 1);
 		for (IN element : inputData) {
 			format.writeRecord(element);
 		}
+		
 		format.close();
+		
+		if(format instanceof FinalizeOnMaster) {
+			((FinalizeOnMaster)format).finalizeGlobal(1);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c043ea8..1fd0e6e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -199,11 +200,11 @@ public abstract class AbstractTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	public BufferedReader[] getResultReader(String resultPath) throws IOException {
-		return getResultReader(resultPath, false);
+		return getResultReader(resultPath, new String[]{}, false);
 	}
 	
-	public BufferedReader[] getResultReader(String resultPath, boolean inOrderOfFiles) throws
IOException {
-		File[] files = getAllInvolvedFiles(resultPath);
+	public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, boolean
inOrderOfFiles) throws IOException {
+		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 		
 		if (inOrderOfFiles) {
 			// sort the files after their name (1, 2, 3, 4)...
@@ -232,8 +233,14 @@ public abstract class AbstractTestBase {
 		return readers;
 	}
 	
+	
+	
 	public BufferedInputStream[] getResultInputStream(String resultPath) throws IOException
{
-		File[] files = getAllInvolvedFiles(resultPath);
+		return getResultInputStream(resultPath, new String[]{});
+	}
+	
+	public BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes)
throws IOException {
+		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 		BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
 		for (int i = 0; i < files.length; i++) {
 			inStreams[i] = new BufferedInputStream(new FileInputStream(files[i]));
@@ -242,11 +249,15 @@ public abstract class AbstractTestBase {
 	}
 	
 	public void readAllResultLines(List<String> target, String resultPath) throws IOException
{
-		readAllResultLines(target, resultPath, false);
+		readAllResultLines(target, resultPath, new String[]{});
 	}
 	
-	public void readAllResultLines(List<String> target, String resultPath, boolean inOrderOfFiles)
throws IOException {
-		for (BufferedReader reader : getResultReader(resultPath, inOrderOfFiles)) {
+	public void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes)
throws IOException {
+		readAllResultLines(target, resultPath, excludePrefixes, false);
+	}
+	
+	public void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {
+		for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles))
{
 			String s = null;
 			while ((s = reader.readLine()) != null) {
 				target.add(s);
@@ -255,8 +266,12 @@ public abstract class AbstractTestBase {
 	}
 	
 	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws
Exception {
+		compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
+	}
+	
+	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[]
excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, false);
+		readAllResultLines(list, resultPath, excludePrefixes, false);
 		
 		String[] result = (String[]) list.toArray(new String[list.size()]);
 		Arrays.sort(result);
@@ -267,10 +282,13 @@ public abstract class AbstractTestBase {
 		Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length,
result.length);
 		Assert.assertArrayEquals(expected, result);
 	}
-	
 	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String
resultPath) throws Exception {
+		compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
+	}
+	
+	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String
resultPath, String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, true);
+		readAllResultLines(list, resultPath, excludePrefixes, true);
 		
 		String[] result = (String[]) list.toArray(new String[list.size()]);
 		
@@ -281,8 +299,12 @@ public abstract class AbstractTestBase {
 	}
 	
 	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String
delimiter, double maxDelta) throws Exception {
+		compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
+	}
+	
+	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String[]
excludePrefixes, String delimiter, double maxDelta) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, false);
+		readAllResultLines(list, resultPath, excludePrefixes, false);
 		
 		String[] result = (String[]) list.toArray(new String[list.size()]);
 		String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
@@ -314,13 +336,25 @@ public abstract class AbstractTestBase {
 		}
 	}
 	
-	private File[] getAllInvolvedFiles(String resultPath) {
+	private File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) {
+		final String[] exPrefs = excludePrefixes;
 		File result = asFile(resultPath);
 		if (!result.exists()) {
 			Assert.fail("Result file was not written");
 		}
 		if (result.isDirectory()) {
-			return result.listFiles();
+			return result.listFiles(new FilenameFilter() {
+				
+				@Override
+				public boolean accept(File dir, String name) {
+					for(String p: exPrefs) {
+						if(name.startsWith(p)) {
+							return false;
+						}
+					}
+					return true;
+				}
+			});
 		} else {
 			return new File[] { result };
 		}


Mime
View raw message