hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1652910 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
Date Mon, 19 Jan 2015 05:23:43 GMT
Author: xuefu
Date: Mon Jan 19 05:23:43 2015
New Revision: 1652910

URL: http://svn.apache.org/r1652910
Log:
HIVE-9372: Parallel checking non-combinable paths in CombineHiveInputFormat (Rui via Xuefu)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1652910&r1=1652909&r2=1652910&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Mon Jan
19 05:23:43 2015
@@ -30,6 +30,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,6 +78,48 @@ public class CombineHiveInputFormat<K ex
   private static final String CLASS_NAME = CombineHiveInputFormat.class.getName();
   public static final Log LOG = LogFactory.getLog(CLASS_NAME);
 
+  // max number of threads we can use to check non-combinable paths
+  private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
+  private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
+
+  private class CheckNonCombinablePathCallable implements Callable<Set<Integer>>
{
+    private final Path[] paths;
+    private final int start;
+    private final int length;
+    private final JobConf conf;
+
+    public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf)
{
+      this.paths = paths;
+      this.start = start;
+      this.length = length;
+      this.conf = conf;
+    }
+
+    @Override
+    public Set<Integer> call() throws Exception {
+      Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+      for (int i = 0; i < length; i++) {
+        PartitionDesc part =
+            HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+                pathToPartitionInfo, paths[i + start],
+                IOPrepareCache.get().allocatePartitionDescMap());
+        // Use HiveInputFormat if any of the paths is not splittable
+        Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
+        InputFormat<WritableComparable, Writable> inputFormat =
+            getInputFormatFromCache(inputFormatClass, conf);
+        if (inputFormat instanceof AvoidSplitCombination &&
+            ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf))
{
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The path [" + paths[i + start] +
+                "] is being parked for HiveInputFormat.getSplits");
+          }
+          nonCombinablePathIndices.add(i);
+        }
+      }
+      return nonCombinablePathIndices;
+    }
+  }
+
   /**
    * CombineHiveInputSplit encapsulates an InputSplit with its corresponding
    * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks
@@ -278,8 +324,6 @@ public class CombineHiveInputFormat<K ex
   private InputSplit[] getCombineSplits(JobConf job, int numSplits,
       Map<String, PartitionDesc> pathToPartitionInfo)
       throws IOException {
-    PerfLogger perfLogger = PerfLogger.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
     init(job);
     Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
     Map<String, Operator<? extends OperatorDesc>> aliasToWork =
@@ -290,7 +334,6 @@ public class CombineHiveInputFormat<K ex
     InputSplit[] splits = null;
     if (combine == null) {
       splits = super.getSplits(job, numSplits);
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
       return splits;
     }
 
@@ -349,7 +392,6 @@ public class CombineHiveInputFormat<K ex
           } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
             //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
             splits = super.getSplits(job, numSplits);
-            perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
             return splits;
           }
 
@@ -363,7 +405,6 @@ public class CombineHiveInputFormat<K ex
                   fStatus[idx].getPath()) != null) {
                 //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
                 splits = super.getSplits(job, numSplits);
-                perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
                 return splits;
               }
             }
@@ -373,7 +414,6 @@ public class CombineHiveInputFormat<K ex
       //don't combine if inputformat is a SymlinkTextInputFormat
       if (inputFormat instanceof SymlinkTextInputFormat) {
         splits = super.getSplits(job, numSplits);
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
         return splits;
       }
 
@@ -451,7 +491,6 @@ public class CombineHiveInputFormat<K ex
     }
 
     LOG.info("number of splits " + result.size());
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
@@ -460,6 +499,8 @@ public class CombineHiveInputFormat<K ex
    */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
     init(job);
 
     ArrayList<InputSplit> result = new ArrayList<InputSplit>();
@@ -469,26 +510,37 @@ public class CombineHiveInputFormat<K ex
     List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2);
     List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2);
 
-    for (Path path : paths) {
-
-      PartitionDesc part =
-          HiveFileFormatUtils.getPartitionDescFromPathRecursively(
-              pathToPartitionInfo, path,
-              IOPrepareCache.get().allocatePartitionDescMap());
-
-      // Use HiveInputFormat if any of the paths is not splittable
-      Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
-      InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass,
job);
-      if (inputFormat instanceof AvoidSplitCombination &&
-          ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The split [" + path +
-              "] is being parked for HiveInputFormat.getSplits");
+    int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
+        (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
+    int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
+    LOG.info("Total number of paths: " + paths.length +
+        ", launching " + numThreads + " threads to check non-combinable ones.");
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
+    try {
+      for (int i = 0; i < numThreads; i++) {
+        int start = i * numPathPerThread;
+        int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
+        futureList.add(executor.submit(
+            new CheckNonCombinablePathCallable(paths, start, length, job)));
+      }
+      Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+      for (Future<Set<Integer>> future : futureList) {
+        nonCombinablePathIndices.addAll(future.get());
+      }
+      for (int i = 0; i < paths.length; i++) {
+        if (nonCombinablePathIndices.contains(i)) {
+          nonCombinablePaths.add(paths[i]);
+        } else {
+          combinablePaths.add(paths[i]);
         }
-        nonCombinablePaths.add(path);
-      } else {
-        combinablePaths.add(path);
       }
+    } catch (Exception e) {
+      LOG.error("Error checking non-combinable path", e);
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+      throw new IOException(e);
+    } finally {
+      executor.shutdownNow();
     }
 
     // Store the previous value for the path specification
@@ -528,6 +580,7 @@ public class CombineHiveInputFormat<K ex
       job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths);
     }
     LOG.info("Number of all splits " + result.size());
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new InputSplit[result.size()]);
   }
 



Mime
View raw message