flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-2353] Respect JobConfigurable interface in Hadoop mapred wrappers
Date Tue, 14 Jul 2015 09:14:47 GMT
Repository: flink
Updated Branches:
  refs/heads/master 84b39dcb5 -> 91675296e


[FLINK-2353] Respect JobConfigurable interface in Hadoop mapred wrappers

This closes #908


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91675296
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91675296
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91675296

Branch: refs/heads/master
Commit: 91675296e1d0fcc503ab3af9c5da7fdb83b78fc5
Parents: 84b39dc
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Jul 14 00:59:20 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Jul 14 01:00:24 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/hadoop/mapred/HadoopInputFormatBase.java | 9 ++++++++-
 .../api/java/hadoop/mapred/HadoopOutputFormatBase.java      | 7 ++++++-
 .../api/java/hadoop/mapred/wrapper/HadoopInputSplit.java    | 4 ++++
 3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index 40f6631..d5dbf38 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
@@ -82,7 +83,13 @@ public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T,
H
 	
 	@Override
 	public void configure(Configuration parameters) {
-		// nothing to do
+		// configure MR InputFormat if necessary
+		if(this.mapredInputFormat instanceof Configurable) {
+			((Configurable)this.mapredInputFormat).setConf(this.jobConf);
+		}
+		else if(this.mapredInputFormat instanceof JobConfigurable) {
+			((JobConfigurable)this.mapredInputFormat).configure(this.jobConf);
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a6a318c..d6dfc2e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -67,9 +68,13 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements
OutputFormat<T>
 
 	@Override
 	public void configure(Configuration parameters) {
-		if(this.mapredOutputFormat instanceof Configurable){
+		// configure MR OutputFormat if necessary
+		if(this.mapredOutputFormat instanceof Configurable) {
 			((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
 		}
+		else if(this.mapredOutputFormat instanceof JobConfigurable) {
+			((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
index dee5452..d949dfd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 
 /**
  * A wrapper that represents an input split from the Hadoop mapred API as
@@ -113,6 +114,9 @@ public class HadoopInputSplit extends LocatableInputSplit {
 		if (hadoopInputSplit instanceof Configurable) {
 			((Configurable) hadoopInputSplit).setConf(this.jobConf);
 		}
+		else if (hadoopInputSplit instanceof JobConfigurable) {
+			((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
+		}
 		hadoopInputSplit.readFields(in);
 	}
 }


Mime
View raw message