flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats.
Date Mon, 27 Apr 2015 21:37:22 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 5cde1e943 -> ffc86f668


[FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats.

Backported fix.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffc86f66
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffc86f66
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffc86f66

Branch: refs/heads/release-0.8
Commit: ffc86f6686520b8ed4270ccd76ac304f64368c6e
Parents: 5cde1e9
Author: fpompermaier <f.pompermaier@gmail.com>
Authored: Mon Apr 27 16:38:51 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Apr 27 23:36:21 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java   | 6 ++++--
 .../api/java/hadoop/mapreduce/HadoopOutputFormatBase.java      | 5 ++++-
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffc86f66/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a59b96f..a6a318c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
 	private transient JobContext jobContext;
 
 	public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
JobConf job) {
-		super();
 		this.mapredOutputFormat = mapredOutputFormat;
 		HadoopUtils.mergeHadoopConf(job);
 		this.jobConf = job;
@@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
 
 	@Override
 	public void configure(Configuration parameters) {
-		// nothing to do
+		if(this.mapredOutputFormat instanceof Configurable){
+			((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ffc86f66/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index a7ae428..32a2e0e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -66,7 +67,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
 
 	@Override
 	public void configure(Configuration parameters) {
-		// nothing to do
+		if(this.mapreduceOutputFormat instanceof Configurable){
+			((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
+		}
 	}
 
 	/**


Mime
View raw message