hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1052414 - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/or...
Date Thu, 23 Dec 2010 23:52:42 GMT
Author: namit
Date: Thu Dec 23 23:52:41 2010
New Revision: 1052414

URL: http://svn.apache.org/viewvc?rev=1052414&view=rev
Log:
HIVE-1806 Merge per dynamic partition based on size of each dynamic partition
(Ning Zhang via namit)


Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out
Modified:
    hive/trunk/CHANGES.txt
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java

Modified: hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hive/trunk/CHANGES.txt?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/CHANGES.txt (original)
+++ hive/trunk/CHANGES.txt Thu Dec 23 23:52:41 2010
@@ -639,6 +639,9 @@ Trunk -  Unreleased
     HIVE-1456 No need to check for LOG as null in sort-merge join
     (Alexey Diomin via namit)
 
+    HIVE-1806 Merge per dynamic partition based on size of each dynamic partition
+    (Ning Zhang via namit)
+
   TESTS
 
     HIVE-1464. improve  test query performance

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Dec 23 23:52:41
2010
@@ -256,7 +256,7 @@ public class HiveConf extends Configurat
     HIVECONVERTJOIN("hive.auto.convert.join", false),
     HIVESKEWJOINKEY("hive.skewjoin.key", 1000000),
     HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
-    HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432), //32M
+    HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M
     MAPREDMINSPLITSIZE("mapred.min.split.size", 1),
     HIVEMERGEMAPONLY("hive.mergejob.maponly", true),
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Dec 23 23:52:41
2010
@@ -510,8 +510,7 @@ public class ExecDriver extends Task<Map
       job.setNumMapTasks(work.getNumMapTasks().intValue());
     }
     if (work.getMinSplitSize() != null) {
-      HiveConf.setIntVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize()
-          .intValue());
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
     }
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());
     job.setReducerClass(ExecReducer.class);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Dec 23 23:52:41
2010
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -60,6 +61,54 @@ public class MoveTask extends Task<MoveW
     super();
   }
 
+  private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir)
+      throws Exception {
+    FileSystem fs = sourcePath.getFileSystem(conf);
+    if (isDfsDir) {
+      // Just do a rename on the URIs, they belong to the same FS
+      String mesg = "Moving data to: " + targetPath.toString();
+      String mesg_detail = " from " + sourcePath.toString();
+      console.printInfo(mesg, mesg_detail);
+
+      // delete the output directory if it already exists
+      fs.delete(targetPath, true);
+      // if source exists, rename. Otherwise, create a empty directory
+      if (fs.exists(sourcePath)) {
+        if (!fs.rename(sourcePath, targetPath)) {
+          throw new HiveException("Unable to rename: " + sourcePath
+              + " to: " + targetPath);
+        }
+      } else if (!fs.mkdirs(targetPath)) {
+        throw new HiveException("Unable to make directory: " + targetPath);
+      }
+    } else {
+      // This is a local file
+      String mesg = "Copying data to local directory " + targetPath.toString();
+      String mesg_detail = " from " + sourcePath.toString();
+      console.printInfo(mesg, mesg_detail);
+
+      // delete the existing dest directory
+      LocalFileSystem dstFs = FileSystem.getLocal(conf);
+
+      if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
+        console.printInfo(mesg, mesg_detail);
+        // if source exists, rename. Otherwise, create a empty directory
+        if (fs.exists(sourcePath)) {
+          fs.copyToLocalFile(sourcePath, targetPath);
+        } else {
+          if (!dstFs.mkdirs(targetPath)) {
+            throw new HiveException("Unable to make local directory: "
+                + targetPath);
+          }
+        }
+      } else {
+        throw new AccessControlException(
+            "Unable to delete the existing destination directory: "
+            + targetPath);
+      }
+    }
+  }
+
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -70,49 +119,23 @@ public class MoveTask extends Task<MoveW
       if (lfd != null) {
         Path targetPath = new Path(lfd.getTargetDir());
         Path sourcePath = new Path(lfd.getSourceDir());
-        FileSystem fs = sourcePath.getFileSystem(conf);
-        if (lfd.getIsDfsDir()) {
-          // Just do a rename on the URIs, they belong to the same FS
-          String mesg = "Moving data to: " + lfd.getTargetDir();
-          String mesg_detail = " from " + lfd.getSourceDir();
-          console.printInfo(mesg, mesg_detail);
-
-          // delete the output directory if it already exists
-          fs.delete(targetPath, true);
-          // if source exists, rename. Otherwise, create a empty directory
-          if (fs.exists(sourcePath)) {
-            if (!fs.rename(sourcePath, targetPath)) {
-              throw new HiveException("Unable to rename: " + sourcePath
-                  + " to: " + targetPath);
-            }
-          } else if (!fs.mkdirs(targetPath)) {
-            throw new HiveException("Unable to make directory: " + targetPath);
-          }
-        } else {
-          // This is a local file
-          String mesg = "Copying data to local directory " + lfd.getTargetDir();
-          String mesg_detail = " from " + lfd.getSourceDir();
-          console.printInfo(mesg, mesg_detail);
-
-          // delete the existing dest directory
-          LocalFileSystem dstFs = FileSystem.getLocal(conf);
-
-          if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
-            console.printInfo(mesg, mesg_detail);
-            // if source exists, rename. Otherwise, create a empty directory
-            if (fs.exists(sourcePath)) {
-              fs.copyToLocalFile(sourcePath, targetPath);
-            } else {
-              if (!dstFs.mkdirs(targetPath)) {
-                throw new HiveException("Unable to make local directory: "
-                    + targetPath);
-              }
-            }
-          } else {
-            throw new AccessControlException(
-                "Unable to delete the existing destination directory: "
-                + targetPath);
-          }
+        moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
+      }
+
+      // Multi-file load is for dynamic partitions when some partitions do not
+      // need to merge and they can simply be moved to the target directory.
+      LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork();
+      if (lmfd != null) {
+        Path destPath = new Path(lmfd.getTargetDir());
+        FileSystem fs = destPath.getFileSystem(conf);
+        if (!fs.exists(destPath)) {
+          fs.mkdirs(destPath);
+        }
+        boolean isDfsDir = lmfd.getIsDfsDir();
+        for (String s: lmfd.getSourceDirs()) {
+          Path srcPath = new Path(s);
+          Path dstPath = new Path(destPath, srcPath.getName());
+          moveFile(srcPath, dstPath, isDfsDir);
         }
       }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Thu Dec 23 23:52:41
2010
@@ -56,6 +56,7 @@ public class TaskRunner extends Thread {
     try {
       exitVal = tsk.executeTask();
     } catch (Throwable t) {
+      t.printStackTrace();
     }
     result.setExitVal(exitVal);
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Thu Dec
23 23:52:41 2010
@@ -246,7 +246,7 @@ public class GenMRFileSink1 implements N
 
     // NOTE: we should gather stats in MR1 (rather than the merge MR job)
     // since it is unknown if the merge MR will be triggered at execution time.
-    
+
     MoveWork dummyMv = new MoveWork(null, null, null,
         new LoadFileDesc(fsConf.getDirName(), finalName, true, null, null), false);
 
@@ -271,8 +271,7 @@ public class GenMRFileSink1 implements N
     // create a Map-only job for merge, otherwise create a MapReduce merge job.
     ParseContext parseCtx = ctx.getParseCtx();
     HiveConf conf = parseCtx.getConf();
-    if ((conf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) ||
-         conf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) &&
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPONLY) &&
         Utilities.supportCombineFileInputFormat()) {
       // create Map-only merge job
       createMap4Merge(fsOp, ctx, finalName);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
Thu Dec 23 23:52:41 2010
@@ -316,7 +316,7 @@ public final class GenMRSkewJoinProcesso
       newPlan.setNumMapTasks(HiveConf
           .getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINNUMMAPTASK));
       newPlan
-          .setMinSplitSize(HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
+          .setMinSplitSize(HiveConf.getLongVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
       newPlan.setInputformat(HiveInputFormat.class.getName());
       Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
           newPlan, jc);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
Thu Dec 23 23:52:41 2010
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 public class ConditionalResolverMergeFiles implements ConditionalResolver,
     Serializable {
   private static final long serialVersionUID = 1L;
+  static final private Log LOG = LogFactory.getLog(ConditionalResolverMergeFiles.class.getName());
 
   public ConditionalResolverMergeFiles() {
   }
@@ -118,88 +121,137 @@ public class ConditionalResolverMergeFil
         .getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESAVGSIZE);
     trgtSize = trgtSize > avgConditionSize ? trgtSize : avgConditionSize;
 
+    Task<? extends Serializable> mvTask = ctx.getListTasks().get(0);
+    Task<? extends Serializable> mrTask = ctx.getListTasks().get(1);
+
     try {
-      // If the input file does not exist, replace it by a empty file
       Path dirPath = new Path(dirName);
       FileSystem inpFs = dirPath.getFileSystem(conf);
-      
-      if (inpFs.exists(dirPath)) {
-        DynamicPartitionCtx dpCtx = ctx.getDPCtx();
-        boolean doMerge = false;
-        FileStatus[] fStats = null;
-        if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
-          fStats = Utilities.getFileStatusRecurse(dirPath,
-              dpCtx.getNumDPCols() + 1, inpFs);
-
-        } else {
-          fStats = inpFs.listStatus(dirPath);
-        }
+      DynamicPartitionCtx dpCtx = ctx.getDPCtx();
 
-        long totalSz = 0;
-        for (FileStatus fStat : fStats) {
-          totalSz += fStat.getLen();
-        }
-        long currAvgSz = totalSz / fStats.length;
-        doMerge = (currAvgSz < avgConditionSize) && (fStats.length > 1);
+      if (inpFs.exists(dirPath)) {
+        // For each dynamic partition, check if it needs to be merged.
+        MapredWork work = (MapredWork) mrTask.getWork();
 
-        if (doMerge) {
-          //
-          // for each dynamic partition, generate a merge task
-          // populate aliasToWork, pathToPartitionInfo, pathToAlias
-          // also set the number of reducers
-          //
-          Task<? extends Serializable> tsk = ctx.getListTasks().get(1);
-          MapredWork work = (MapredWork) tsk.getWork();
-
-
-          // Dynamic partition: replace input path (root to dp paths) with dynamic partition
-          // input paths.
-          if (dpCtx != null &&  dpCtx.getNumDPCols() > 0) {
-            FileStatus[] status = Utilities.getFileStatusRecurse(dirPath,
-                dpCtx.getNumDPCols(), inpFs);
-
-            // cleanup pathToPartitionInfo
-          	Map<String, PartitionDesc> ptpi = work.getPathToPartitionInfo();
-          	assert ptpi.size() == 1;
-          	String path = ptpi.keySet().iterator().next();
-          	TableDesc tblDesc = ptpi.get(path).getTableDesc();
-          	ptpi.remove(path); // the root path is not useful anymore
-
-          	// cleanup pathToAliases
-          	Map<String, ArrayList<String>> pta = work.getPathToAliases();
-          	assert pta.size() == 1;
-          	path = pta.keySet().iterator().next();
-          	ArrayList<String> aliases = pta.get(path);
-          	pta.remove(path); // the root path is not useful anymore
-
-          	// populate pathToPartitionInfo and pathToAliases w/ DP paths
-          	for (int i = 0; i < status.length; ++i) {
-          	  work.getPathToAliases().put(status[i].getPath().toString(), aliases);
-          	  // get the full partition spec from the path and update the PartitionDesc
-          	  Map<String, String> fullPartSpec = new LinkedHashMap<String, String>(
-          	      dpCtx.getPartSpec());
-          	  Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath());
-          	  PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec);
-          	  work.getPathToPartitionInfo().put(
-          	      status[i].getPath().toString(),
-          	      pDesc);
+        // Dynamic partition: replace input path (root to dp paths) with dynamic partition
+        // input paths.
+        if (dpCtx != null &&  dpCtx.getNumDPCols() > 0) {
+
+          // get list of dynamic partitions
+          FileStatus[] status = Utilities.getFileStatusRecurse(dirPath,
+              dpCtx.getNumDPCols(), inpFs);
+
+          // cleanup pathToPartitionInfo
+          Map<String, PartitionDesc> ptpi = work.getPathToPartitionInfo();
+          assert ptpi.size() == 1;
+          String path = ptpi.keySet().iterator().next();
+          TableDesc tblDesc = ptpi.get(path).getTableDesc();
+          ptpi.remove(path); // the root path is not useful anymore
+
+          // cleanup pathToAliases
+          Map<String, ArrayList<String>> pta = work.getPathToAliases();
+          assert pta.size() == 1;
+          path = pta.keySet().iterator().next();
+          ArrayList<String> aliases = pta.get(path);
+          pta.remove(path); // the root path is not useful anymore
+
+          // populate pathToPartitionInfo and pathToAliases w/ DP paths
+          long totalSz = 0;
+          boolean doMerge = false;
+          // list of paths that don't need to merge but need to move to the dest location
+          List<String> toMove = new ArrayList<String>();
+          for (int i = 0; i < status.length; ++i) {
+            long len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize);
+            if (len >= 0) {
+              doMerge = true;
+              totalSz += len;
+              work.getPathToAliases().put(status[i].getPath().toString(), aliases);
+              // get the full partition spec from the path and update the PartitionDesc
+              Map<String, String> fullPartSpec = new LinkedHashMap<String, String>(
+                  dpCtx.getPartSpec());
+              Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath());
+              PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec);
+              work.getPathToPartitionInfo().put(status[i].getPath().toString(), pDesc);
+            } else {
+              toMove.add(status[i].getPath().toString());
+            }
+          }
+          if (doMerge) {
+            // add the merge MR job
+            setupMapRedWork(conf, work, trgtSize, totalSz);
+            resTsks.add(mrTask);
+
+            // add the move task for those partitions that do not need merging
+          	if (toMove.size() > 0) { //
+          	  // modify the existing move task as it is already in the candidate running tasks
+          	  MoveWork mvWork = (MoveWork) mvTask.getWork();
+          	  LoadFileDesc lfd = mvWork.getLoadFileWork();
+          	  LoadMultiFilesDesc lmfd = new LoadMultiFilesDesc(toMove,
+          	      lfd.getTargetDir(), lfd.getIsDfsDir(), lfd.getColumns(), lfd.getColumnTypes());
+          	  mvWork.setLoadFileWork(null);
+          	  mvWork.setLoadTableWork(null);
+          	  mvWork.setMultiFilesDesc(lmfd);
+          	  resTsks.add(mvTask);
           	}
-          } else {
-            int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-            int reducers = (int) ((totalSz + trgtSize - 1) / trgtSize);
-            reducers = Math.max(1, reducers);
-            reducers = Math.min(maxReducers, reducers);
-            work.setNumReduceTasks(reducers);
+          } else { // add the move task
+            resTsks.add(mvTask);
+          }
+        } else { // no dynamic partitions
+          long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize);
+          if (totalSz >= 0) { // add the merge job
+            setupMapRedWork(conf, work, trgtSize, totalSz);
+            resTsks.add(mrTask);
+          } else { // don't need to merge, add the move job
+            resTsks.add(mvTask);
           }
-
-          resTsks.add(tsk);
-          return resTsks;
         }
+      } else {
+        resTsks.add(mvTask);
       }
     } catch (IOException e) {
       e.printStackTrace();
     }
-    resTsks.add(ctx.getListTasks().get(0));
     return resTsks;
   }
+
+  private void setupMapRedWork(HiveConf conf, MapredWork work, long targetSize, long totalSize)
{
+    if (work.getNumReduceTasks() > 0) {
+      int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+      int reducers = (int) ((totalSize + targetSize - 1) / targetSize);
+      reducers = Math.max(1, reducers);
+      reducers = Math.min(maxReducers, reducers);
+      work.setNumReduceTasks(reducers);
+    }
+    work.setMinSplitSize(targetSize);
+  }
+  /**
+   * Whether to merge files inside directory given the threshold of the average file size.
+   *
+   * @param inpFs input file system.
+   * @param dirPath input file directory.
+   * @param avgSize threshold of average file size.
+   * @return -1 if not need to merge (either because of there is only 1 file or the
+   * average size is larger than avgSize). Otherwise the size of the total size of files.
+   * If return value is 0 that means there are multiple files each of which is an empty file.
+   * This could be true when the table is bucketized and all buckets are empty.
+   */
+  private long getMergeSize(FileSystem inpFs, Path dirPath, long avgSize) {
+    try {
+      FileStatus[] fStats = inpFs.listStatus(dirPath);
+      if (fStats.length <= 1) {
+        return -1;
+      }
+      long totalSz = 0;
+      for (FileStatus fStat : fStats) {
+        totalSz += fStat.getLen();
+      }
+      if (totalSz < avgSize * fStats.length) {
+        return totalSz;
+      } else {
+        return -1;
+      }
+    } catch (IOException e) {
+      return -1;
+    }
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java?rev=1052414&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java Thu Dec
23 23:52:41 2010
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * LoadMultiFilesDesc.
+ *
+ */
+public class LoadMultiFilesDesc implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private String targetDir;
+  private boolean isDfsDir;
+  // list of columns, comma separated
+  private String columns;
+  private String columnTypes;
+  private List<String> srcDirs;
+
+  public LoadMultiFilesDesc() {
+  }
+
+  public LoadMultiFilesDesc(final List<String> sourceDirs, final String targetDir,
+      final boolean isDfsDir, final String columns, final String columnTypes) {
+
+    this.srcDirs = sourceDirs;
+    this.targetDir = targetDir;
+    this.isDfsDir = isDfsDir;
+    this.columns = columns;
+    this.columnTypes = columnTypes;
+  }
+
+  @Explain(displayName = "destination")
+  public String getTargetDir() {
+    return targetDir;
+  }
+
+  @Explain(displayName = "sources")
+  public List<String> getSourceDirs() {
+    return srcDirs;
+  }
+
+  public void setSourceDirs(List<String> srcs) {
+    this.srcDirs = srcs;
+  }
+
+  public void setTargetDir(final String targetDir) {
+    this.targetDir = targetDir;
+  }
+
+  @Explain(displayName = "hdfs directory")
+  public boolean getIsDfsDir() {
+    return isDfsDir;
+  }
+
+  public void setIsDfsDir(final boolean isDfsDir) {
+    this.isDfsDir = isDfsDir;
+  }
+
+  /**
+   * @return the columns
+   */
+  public String getColumns() {
+    return columns;
+  }
+
+  /**
+   * @param columns
+   *          the columns to set
+   */
+  public void setColumns(String columns) {
+    this.columns = columns;
+  }
+
+  /**
+   * @return the columnTypes
+   */
+  public String getColumnTypes() {
+    return columnTypes;
+  }
+
+  /**
+   * @param columnTypes
+   *          the columnTypes to set
+   */
+  public void setColumnTypes(String columnTypes) {
+    this.columnTypes = columnTypes;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Thu Dec 23 23:52:41
2010
@@ -60,7 +60,7 @@ public class MapredWork implements Seria
 
   private Integer numReduceTasks;
   private Integer numMapTasks;
-  private Integer minSplitSize;
+  private Long minSplitSize;
 
   private boolean needsTagging;
   private boolean hadoopSupportsSplittable;
@@ -315,11 +315,11 @@ public class MapredWork implements Seria
     this.hadoopSupportsSplittable = hadoopSupportsSplittable;
   }
 
-  public Integer getMinSplitSize() {
+  public Long getMinSplitSize() {
     return minSplitSize;
   }
 
-  public void setMinSplitSize(Integer minSplitSize) {
+  public void setMinSplitSize(Long minSplitSize) {
     this.minSplitSize = minSplitSize;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java?rev=1052414&r1=1052413&r2=1052414&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java Thu Dec 23 23:52:41
2010
@@ -36,6 +36,7 @@ public class MoveWork implements Seriali
   private static final long serialVersionUID = 1L;
   private LoadTableDesc loadTableWork;
   private LoadFileDesc loadFileWork;
+  private LoadMultiFilesDesc loadMultiFilesWork;
 
   private boolean checkFileFormat;
   ArrayList<String> dpSpecPaths; // dynamic partition specified paths -- the root of
DP columns
@@ -93,6 +94,15 @@ public class MoveWork implements Seriali
     return loadFileWork;
   }
 
+  @Explain(displayName = "files")
+  public LoadMultiFilesDesc getLoadMultiFilesWork() {
+    return loadMultiFilesWork;
+  }
+
+  public void setMultiFilesDesc(LoadMultiFilesDesc lmfd) {
+    this.loadMultiFilesWork = lmfd;
+  }
+
   public void setLoadFileWork(final LoadFileDesc loadFileWork) {
     this.loadFileWork = loadFileWork;
   }

Added: hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q?rev=1052414&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q Thu Dec 23 23:52:41
2010
@@ -0,0 +1,27 @@
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+create table srcpart_merge_dp like srcpart;
+
+create table merge_dynamic_part like srcpart;
+
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08',
hr=11);
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08',
hr=11);
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08',
hr=11);
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08',
hr=11);
+load data local inpath '../data/files/srcbucket0.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08',
hr=12);
+load data local inpath '../data/files/srcbucket1.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08',
hr=12);
+
+
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; 
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+set hive.merge.smallfiles.avgsize=3000;
+set hive.exec.compress.output=false;
+
+explain
+insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value,
hr from srcpart_merge_dp where ds='2008-04-08';
+insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value,
hr from srcpart_merge_dp where ds='2008-04-08';
+
+show table extended like `merge_dynamic_part`;
+

Added: hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out?rev=1052414&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out Thu Dec 23
23:52:41 2010
@@ -0,0 +1,163 @@
+PREHOOK: query: create table srcpart_merge_dp like srcpart
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table srcpart_merge_dp like srcpart
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@srcpart_merge_dp
+PREHOOK: query: create table merge_dynamic_part like srcpart
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table merge_dynamic_part like srcpart
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@merge_dynamic_part
+PREHOOK: query: load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+PREHOOK: type: LOAD
+POSTHOOK: query: load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11
+PREHOOK: query: load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+PREHOOK: type: LOAD
+POSTHOOK: query: load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11
+PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+PREHOOK: type: LOAD
+POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11
+PREHOOK: query: load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+PREHOOK: type: LOAD
+POSTHOOK: query: load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=11)
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11
+PREHOOK: query: load data local inpath '../data/files/srcbucket0.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=12)
+PREHOOK: type: LOAD
+POSTHOOK: query: load data local inpath '../data/files/srcbucket0.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=12)
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=12
+PREHOOK: query: load data local inpath '../data/files/srcbucket1.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=12)
+PREHOOK: type: LOAD
+POSTHOOK: query: load data local inpath '../data/files/srcbucket1.txt' INTO TABLE srcpart_merge_dp
partition(ds='2008-04-08', hr=12)
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=12
+PREHOOK: query: explain
+insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value,
hr from srcpart_merge_dp where ds='2008-04-08'
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value,
hr from srcpart_merge_dp where ds='2008-04-08'
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF srcpart_merge_dp)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB
merge_dynamic_part (TOK_PARTSPEC (TOK_PARTVAL ds '2008-04-08') (TOK_PARTVAL hr)))) (TOK_SELECT
(TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_TABLE_OR_COL
hr))) (TOK_WHERE (= (TOK_TABLE_OR_COL ds) '2008-04-08'))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
+  Stage-4
+  Stage-0 depends on stages: Stage-4, Stage-3
+  Stage-2 depends on stages: Stage-0
+  Stage-3
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        srcpart_merge_dp 
+          TableScan
+            alias: srcpart_merge_dp
+            Filter Operator
+              predicate:
+                  expr: (ds = '2008-04-08')
+                  type: boolean
+              Select Operator
+                expressions:
+                      expr: key
+                      type: string
+                      expr: value
+                      type: string
+                      expr: hr
+                      type: string
+                outputColumnNames: _col0, _col1, _col2
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 1
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      name: merge_dynamic_part
+
+  Stage: Stage-5
+    Conditional Operator
+
+  Stage: Stage-4
+    Move Operator
+      files:
+          hdfs directory: true
+          destination: pfile:/data/users/nzhang/work/2/apache-hive/build/ql/scratchdir/hive_2010-12-15_11-09-33_994_7142549978626881266/-ext-10000
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 2008-04-08
+            hr 
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: merge_dynamic_part
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+  Stage: Stage-3
+    Map Reduce
+      Alias -> Map Operator Tree:
+        pfile:/data/users/nzhang/work/2/apache-hive/build/ql/scratchdir/hive_2010-12-15_11-09-33_994_7142549978626881266/-ext-10002

+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  name: merge_dynamic_part
+
+
+PREHOOK: query: insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr)
select key, value, hr from srcpart_merge_dp where ds='2008-04-08'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=12
+PREHOOK: Output: default@merge_dynamic_part@ds=2008-04-08
+POSTHOOK: query: insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr)
select key, value, hr from srcpart_merge_dp where ds='2008-04-08'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@merge_dynamic_part@ds=2008-04-08/hr=11
+POSTHOOK: Output: default@merge_dynamic_part@ds=2008-04-08/hr=12
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key,
type:string, comment:default), ]
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value,
type:string, comment:default), ]
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key,
type:string, comment:default), ]
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value,
type:string, comment:default), ]
+PREHOOK: query: show table extended like `merge_dynamic_part`
+PREHOOK: type: SHOW_TABLESTATUS
+POSTHOOK: query: show table extended like `merge_dynamic_part`
+POSTHOOK: type: SHOW_TABLESTATUS
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key,
type:string, comment:default), ]
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value,
type:string, comment:default), ]
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key,
type:string, comment:default), ]
+POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value,
type:string, comment:default), ]
+tableName:merge_dynamic_part
+owner:null
+location:pfile:/data/users/nzhang/work/2/apache-hive/build/ql/test/data/warehouse/merge_dynamic_part
+inputformat:org.apache.hadoop.mapred.TextInputFormat
+outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+columns:struct columns { string key, string value}
+partitioned:true
+partitionColumns:struct partition_columns { string ds, string hr}
+totalNumberFiles:3
+totalFileSize:17415
+maxFileSize:5901
+minFileSize:5702
+lastAccessTime:0
+lastUpdateTime:1292440184000
+



Mime
View raw message