Author: gunther
Date: Sat Jan 25 05:12:24 2014
New Revision: 1561256
URL: http://svn.apache.org/r1561256
Log:
HIVE-6263: Avoid sending input files multiple times on Tez (Patch by Gunther Hagleitner, reviewed
by Vikram Dixit K)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1561256&r1=1561255&r2=1561256&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Jan 25 05:12:24
2014
@@ -2917,6 +2917,16 @@ public final class Utilities {
}
/**
+ * On Tez we're not creating dummy files when getting/setting input paths.
+ * We let Tez handle the situation. We're also setting the paths in the AM
+ * so we don't want to depend on scratch dir and context.
+ */
+ public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exception
{
+ List<Path> paths = getInputPaths(job, work, null, null);
+ return paths;
+ }
+
+ /**
* Computes a list of all input paths needed to compute the given MapWork. All aliases
* are considered and a merged list of input paths is returned. If any input path points
* to an empty table or partition a dummy file in the scratch dir is instead created and
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1561256&r1=1561255&r2=1561256&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sat Jan 25 05:12:24
2014
@@ -209,11 +209,6 @@ public class DagUtils {
// set up the operator plan
Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
- // setup input paths and split info
- List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork,
- mrScratchDir, ctx);
- Utilities.setInputPaths(conf, inputPaths);
-
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, mapWork);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1561256&r1=1561255&r2=1561256&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Sat
Jan 25 05:12:24 2014
@@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -114,8 +116,23 @@ public class BucketizedHiveInputFormat<K
Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
+ // on tez we're avoiding to duplicate the file info in FileInputFormat.
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ try {
+ List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
+ dirs = paths.toArray(new Path[paths.size()]);
+ if (dirs.length == 0) {
+ // if we still don't have any files it's time to fail.
+ throw new IOException("No input paths specified in job");
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not create input paths", e);
+ }
+ } else {
+ throw new IOException("No input paths specified in job");
+ }
}
+
JobConf newjob = new JobConf(job);
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1561256&r1=1561255&r2=1561256&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Sat Jan
25 05:12:24 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -271,6 +272,17 @@ public class CombineHiveInputFormat<K ex
mrwork.getAliasToWork();
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
.getCombineFileInputFormat();
+
+ // on tez we're avoiding duplicating path info since the info will go over
+ // rpc
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ try {
+ List<Path> dirs = Utilities.getInputPathsTez(job, mrwork);
+ Utilities.setInputPaths(job, dirs);
+ } catch (Exception e) {
+ throw new IOException("Could not create input paths", e);
+ }
+ }
InputSplit[] splits = null;
if (combine == null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1561256&r1=1561255&r2=1561256&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sat Jan 25 05:12:24
2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -300,7 +301,21 @@ public class HiveInputFormat<K extends W
Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
+ // on tez we're avoiding to duplicate the file info in FileInputFormat.
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ try {
+ List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
+ dirs = paths.toArray(new Path[paths.size()]);
+ if (dirs.length == 0) {
+ // if we still don't have any files it's time to fail.
+ throw new IOException("No input paths specified in job");
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not create input files", e);
+ }
+ } else {
+ throw new IOException("No input paths specified in job");
+ }
}
JobConf newjob = new JobConf(job);
List<InputSplit> result = new ArrayList<InputSplit>();
|