flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] git commit: Small bug fixes for running hadoop output formats
Date Fri, 15 Aug 2014 11:08:43 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master a87559aee -> 653216ba8


Small bug fixes for running hadoop output formats

This closes #75


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/653216ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/653216ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/653216ba

Branch: refs/heads/master
Commit: 653216ba83910ead811cfddf525b0448325882a3
Parents: d0cead7
Author: twalthr <info@twalthr.com>
Authored: Mon Jul 21 12:11:16 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200

----------------------------------------------------------------------
 .../mapred/HadoopOutputFormat.java                  | 10 ++++++----
 .../mapreduce/HadoopOutputFormat.java               | 16 ++++++++++------
 2 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/653216ba/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index deae026..69a0b50 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -100,16 +100,18 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 				+ Integer.toString(taskNumber + 1) 
 				+ "_0");
 		
+		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+		
 		try {
 			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
 		
-		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-		// for hadoop 2.2
-		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		
 		this.fileOutputCommitter = new FileOutputCommitter();
 		
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/653216ba/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index 9eabc03..f071eda 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -103,14 +103,17 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 				+ Integer.toString(taskNumber + 1) 
 				+ "_0");
 		
+		this.configuration.set("mapred.task.id", taskAttemptID.toString());
+		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+		
 		try {
 			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
-		this.configuration.set("mapred.task.id", taskAttemptID.toString());
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
 		
 		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")),
context);
 		
@@ -157,11 +160,12 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable>
implement
 		}
 		this.fileOutputCommitter.commitJob(this.context);
 		
-		// rename tmp-* files to final name
-		FileSystem fs = FileSystem.get(this.configuration);
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
+		
+		// rename tmp-* files to final name
+		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+		
 		final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
 		
 		// isDirectory does not work in hadoop 1


Mime
View raw message