hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sp...@apache.org
Subject [1/2] hive git commit: HIVE-15361: INSERT dynamic partition on S3 fails with a MoveTask failure (Sergio Pena, reviewed by Mohit Sabharwal and Illya Yalovvy)
Date Wed, 07 Dec 2016 20:17:15 GMT
Repository: hive
Updated Branches:
  refs/heads/master d60802d6a -> c0978844b


http://git-wip-us.apache.org/repos/asf/hive/blob/c0978844/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index adc1188..76204e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -34,6 +34,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.BlobStorageUtils;
@@ -1349,19 +1350,15 @@ public final class GenMapRedUtils {
     cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
     // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
     // know if merge MR2 will be triggered at execution time
+    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOutput);
     ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
-        fsInputDesc.getFinalDirName().toString());
+        fsInputDesc.getFinalDirName(), finalName, mvTask, dependencyTask);
 
     // keep the dynamic partition context in conditional task resolver context
     ConditionalResolverMergeFilesCtx mrCtx =
         (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
     mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
     mrCtx.setLbCtx(fsInputDesc.getLbCtx());
-
-    //
-    // 3. add the moveTask as the children of the conditional task
-    //
-    linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
   }
 
   /**
@@ -1434,60 +1431,12 @@ public final class GenMapRedUtils {
           parentTask.addDependentTask(mvTask);
         }
       } else {
-        if (BlobStorageUtils.areOptimizationsEnabled(hconf) && parentTask instanceof
MoveTask && areMoveTasksOnSameBlobStorage(hconf, (Task<MoveWork>)parentTask,
mvTask)) {
-          mergeMoveTasks((Task<MoveWork>)parentTask, mvTask);
-        } else {
-          parentTask.addDependentTask(mvTask);
-        }
+        parentTask.addDependentTask(mvTask);
       }
     }
   }
 
   /**
-   * Compare if moveTask1 source path is on the same filesystem as moveTask2 destination
path.
-   *
-   * @param hconf Configuration object
-   * @param moveTask1 First MoveTask where the source will be compared.
-   * @param moveTask2 Second MoveTask where the destination will be compared.
-   * @return True if source/destination are on the same filesystem; False otherwise.
-   */
-  private static boolean areMoveTasksOnSameBlobStorage(HiveConf hconf, Task<MoveWork>
moveTask1, Task<MoveWork> moveTask2) {
-    Path sourcePath1, targetPath2;
-
-    MoveWork moveWork1 = moveTask1.getWork();
-    MoveWork moveWork2 = moveTask2.getWork();
-
-    // Let's not merge the tasks in case both file and table work are present. This should
not
-    // be configured this way, but the API allows you to do that.
-    if (moveWork1.getLoadFileWork() != null && moveWork1.getLoadTableWork() != null)
{ return false; }
-    if (moveWork2.getLoadFileWork() != null && moveWork2.getLoadTableWork() != null)
{ return false; }
-
-    if (moveWork1.getLoadFileWork() != null) {
-      sourcePath1 = moveWork1.getLoadFileWork().getSourcePath();
-    } else if (moveWork1.getLoadTableWork() != null) {
-      sourcePath1 = moveWork1.getLoadTableWork().getSourcePath();
-    } else {
-      // Multi-files is not supported on this optimization
-      return false;
-    }
-
-    if (moveWork2.getLoadFileWork() != null) {
-      targetPath2 = moveWork2.getLoadFileWork().getTargetDir();
-    } else if (moveWork2.getLoadTableWork() != null) {
-      targetPath2 = getTableLocationPath(hconf, moveWork2.getLoadTableWork().getTable());
-    } else {
-      // Multi-files is not supported on this optimization
-      return false;
-    }
-
-    if (sourcePath1 != null && targetPath2 != null && BlobStorageUtils.isBlobStoragePath(hconf,
sourcePath1)) {
-      return sourcePath1.toUri().getScheme().equals(targetPath2.toUri().getScheme());
-    } else {
-      return false;
-    }
-  }
-
-  /**
    * Returns the table location path from a TableDesc object.
    *
    * @param hconf Configuration object.
@@ -1507,69 +1456,6 @@ public final class GenMapRedUtils {
   }
 
   /**
-   * Creates a new MoveTask that uses the moveTask1 source and moveTask2 destination as new
-   * source/destination paths. This function is useful when two MoveTask are found on the
-   * execution plan, and they are join each other.
-   *
-   * @param moveTask1 First MoveTask where the source path will be used.
-   * @param moveTask2 Second MoveTask where the destination path will be used.
-   */
-  private static void mergeMoveTasks(Task<MoveWork> moveTask1, Task<MoveWork>
moveTask2) {
-    Path sourcePath1;
-    LoadTableDesc loadTableDesc = null;
-    LoadFileDesc loadFileDesc = null;
-
-    MoveWork moveWork1 = moveTask1.getWork();
-    MoveWork moveWork2 = moveTask2.getWork();
-
-    // Let's not merge the tasks in case both file and table work are present. This should
not
-    // be configured this way, but the API allows you to do that.
-    if (moveWork1.getLoadFileWork() != null && moveWork1.getLoadTableWork() != null)
{ return; }
-    if (moveWork2.getLoadFileWork() != null && moveWork2.getLoadTableWork() != null)
{ return; }
-
-    if (moveWork1.getLoadFileWork() != null) {
-      sourcePath1 = moveTask1.getWork().getLoadFileWork().getSourcePath();
-    } else if (moveWork1.getLoadTableWork() != null) {
-      sourcePath1 = moveTask1.getWork().getLoadTableWork().getSourcePath();
-    } else {
-      // Multi-files is not supported on this optimization
-      return;
-    }
-
-    if (moveTask2.getWork().getLoadFileWork() != null) {
-      loadFileDesc = new LoadFileDesc(
-          sourcePath1,
-          moveWork2.getLoadFileWork().getTargetDir(),
-          moveWork2.getLoadFileWork().getIsDfsDir(),
-          moveWork2.getLoadFileWork().getColumns(),
-          moveWork2.getLoadFileWork().getColumnTypes()
-      );
-    } else if (moveTask2.getWork().getLoadTableWork() != null) {
-      loadTableDesc = new LoadTableDesc(
-          sourcePath1,
-          moveWork2.getLoadTableWork().getTable(),
-          moveWork2.getLoadTableWork().getPartitionSpec(),
-          moveWork2.getLoadTableWork().getReplace(),
-          moveWork2.getLoadTableWork().getWriteType()
-      );
-    } else {
-      // Multi-files is not supported on this optimization
-      return;
-    }
-
-    moveWork1.setLoadTableWork(loadTableDesc);
-    moveWork1.setLoadFileWork(loadFileDesc);
-    moveWork1.setCheckFileFormat(moveWork2.getCheckFileFormat());
-
-    // Link task2 dependent tasks to MoveTask1
-    if (moveTask2.getDependentTasks() != null) {
-      for (Task dependentTask : moveTask2.getDependentTasks()) {
-        moveTask1.addDependentTask(dependentTask);
-      }
-    }
-  }
-
-  /**
    * Add the StatsTask as a dependent task of the MoveTask
    * because StatsTask will change the Table/Partition metadata. For atomicity, we
    * should not change it before the data is actually there done by MoveTask.
@@ -1747,24 +1633,102 @@ public final class GenMapRedUtils {
   }
 
   /**
+   * Checks whether the given input/output paths and a linked MoveWork should be merged into
one only MoveWork.
+   * This is an optimization for BlobStore systems to avoid doing two renames/copies that
are not necessary.
+   *
+   * @param conf A HiveConf object to check if BlobStorage optimizations are enabled.
+   * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks.
+   * @param condOutputPath A path that the ConditionalTask uses as output for its sub-tasks.
+   * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks.
+   * @return True if both Conditional input/output paths and the linked MoveWork should be
merged.
+   */
+  @VisibleForTesting
+  protected static boolean shouldMergeMovePaths(HiveConf conf, Path condInputPath, Path condOutputPath,
MoveWork linkedMoveWork) {
+    Path linkedSourcePath, linkedTargetPath;
+
+    if (linkedMoveWork == null || !BlobStorageUtils.areOptimizationsEnabled(conf)) {
+      return false;
+    }
+
+    if (linkedMoveWork.getLoadFileWork() != null && linkedMoveWork.getLoadTableWork()
== null) {
+      linkedSourcePath = linkedMoveWork.getLoadFileWork().getSourcePath();
+      linkedTargetPath = linkedMoveWork.getLoadFileWork().getTargetDir();
+    } else if (linkedMoveWork.getLoadTableWork() != null && linkedMoveWork.getLoadFileWork()
== null) {
+      linkedSourcePath = linkedMoveWork.getLoadTableWork().getSourcePath();
+      linkedTargetPath = getTableLocationPath(conf, linkedMoveWork.getLoadTableWork().getTable());
+    } else {
+      return false;
+    }
+
+    return condOutputPath.equals(linkedSourcePath)
+        && BlobStorageUtils.isBlobStoragePath(conf, condInputPath)
+        && BlobStorageUtils.isBlobStoragePath(conf, linkedTargetPath);
+  }
+
+  /**
+   * Merges the given Conditional input path and the linked MoveWork into one only MoveWork.
+   * This is an optimization for BlobStore systems to avoid doing two renames or copies that
are not necessary.
+   *
+   * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks.
+   * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks.
+   * @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork
as target.
+   */
+  @VisibleForTesting
+  protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) {
+    MoveWork newWork = new MoveWork(linkedMoveWork);
+    LoadFileDesc fileDesc = null;
+    LoadTableDesc tableDesc = null;
+
+    if (linkedMoveWork.getLoadFileWork() != null) {
+      fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork());
+      fileDesc.setSourcePath(condInputPath);
+    } else if (linkedMoveWork.getLoadTableWork() != null) {
+      tableDesc = new LoadTableDesc(linkedMoveWork.getLoadTableWork());
+      tableDesc.setSourcePath(condInputPath);
+    } else {
+      throw new IllegalArgumentException("Merging a path with a MoveWork with multi-files
work is not allowed.");
+    }
+
+    newWork.setLoadFileWork(fileDesc);
+    newWork.setLoadTableWork(tableDesc);
+
+    return newWork;
+  }
+
+  /**
    * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
    *
    * @param conf
    *          HiveConf
    * @param currTask
    *          current leaf task
-   * @param mvWork
+   * @param dummyMoveWork
    *          MoveWork for the move task
    * @param mergeWork
    *          MapredWork for the merge task.
-   * @param inputPath
+   * @param condInputPath
    *          the input directory of the merge/move task
+   * @param condOutputPath
+   *          the output directory of the merge/move task
+   * @param moveTaskToLink
+   *          a MoveTask that may be linked to the conditional sub-tasks
+   * @param dependencyTask
+   *          a dependency task that may be linked to the conditional sub-tasks
    * @return The conditional task
    */
-  @SuppressWarnings("unchecked")
-  public static ConditionalTask createCondTask(HiveConf conf,
-      Task<? extends Serializable> currTask, MoveWork mvWork,
-      Serializable mergeWork, String inputPath) {
+  private static ConditionalTask createCondTask(HiveConf conf,
+      Task<? extends Serializable> currTask, MoveWork dummyMoveWork, Serializable mergeWork,
+      Path condInputPath, Path condOutputPath, Task<MoveWork> moveTaskToLink, DependencyCollectionTask
dependencyTask) {
+
+    boolean shouldMergeMovePaths = (moveTaskToLink != null && dependencyTask == null
+        && shouldMergeMovePaths(conf, condInputPath, condOutputPath, moveTaskToLink.getWork()));
+
+    MoveWork workForMoveOnlyTask;
+    if (shouldMergeMovePaths) {
+      workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork());
+    } else {
+      workForMoveOnlyTask = dummyMoveWork;
+    }
 
     // There are 3 options for this ConditionalTask:
     // 1) Merge the partitions
@@ -1773,9 +1737,9 @@ public final class GenMapRedUtils {
     // merge others) in this case the merge is done first followed by the move to prevent
     // conflicts.
     Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
-    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
+    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(workForMoveOnlyTask,
conf);
     Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork,
conf);
-    Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
+    Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(dummyMoveWork,
conf);
 
     // NOTE! It is necessary merge task is the parent of the move task, and not
     // the other way around, for the proper execution of the execute method of
@@ -1783,7 +1747,7 @@ public final class GenMapRedUtils {
     mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
 
     List<Serializable> listWorks = new ArrayList<Serializable>();
-    listWorks.add(mvWork);
+    listWorks.add(workForMoveOnlyTask);
     listWorks.add(mergeWork);
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
@@ -1799,12 +1763,26 @@ public final class GenMapRedUtils {
     // create resolver
     cndTsk.setResolver(new ConditionalResolverMergeFiles());
     ConditionalResolverMergeFilesCtx mrCtx =
-        new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+        new ConditionalResolverMergeFilesCtx(listTasks, condInputPath.toString());
     cndTsk.setResolverCtx(mrCtx);
 
     // make the conditional task as the child of the current leaf task
     currTask.addDependentTask(cndTsk);
 
+    if (shouldMergeMovePaths) {
+      // If a new MoveWork was created, then we should link all dependent tasks from the
MoveWork to link.
+      if (moveTaskToLink.getDependentTasks() != null) {
+        for (Task dependentTask : moveTaskToLink.getDependentTasks()) {
+          moveOnlyMoveTask.addDependentTask(dependentTask);
+        }
+      }
+    } else {
+      addDependentMoveTasks(moveTaskToLink, conf, moveOnlyMoveTask, dependencyTask);
+    }
+
+    addDependentMoveTasks(moveTaskToLink, conf, mergeOnlyMergeTask, dependencyTask);
+    addDependentMoveTasks(moveTaskToLink, conf, mergeAndMoveMoveTask, dependencyTask);
+
     return cndTsk;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c0978844/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
index bcd3125..d708df3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
@@ -42,5 +42,8 @@ public class LoadDesc implements Serializable {
   public Path getSourcePath() {
     return sourcePath;
   }
-  
+
+  public void setSourcePath(Path sourcePath) {
+    this.sourcePath = sourcePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c0978844/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 9a868a0..03202fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -39,6 +39,16 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
   public LoadFileDesc() {
   }
 
+  public LoadFileDesc(final LoadFileDesc o) {
+    super(o.getSourcePath());
+
+    this.targetDir = o.targetDir;
+    this.isDfsDir = o.isDfsDir;
+    this.columns = o.columns;
+    this.columnTypes = o.columnTypes;
+    this.destinationCreateTable = o.destinationCreateTable;
+  }
+
   public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc  createViewDesc,
                       final Path sourcePath, final Path targetDir, final boolean isDfsDir,
                       final String columns, final String columnTypes) {

http://git-wip-us.apache.org/repos/asf/hive/blob/c0978844/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 771a919..aa77850 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -46,6 +46,18 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
   private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be
ordered map
 
+  public LoadTableDesc(final LoadTableDesc o) {
+    super(o.getSourcePath());
+
+    this.replace = o.replace;
+    this.dpCtx = o.dpCtx;
+    this.lbCtx = o.lbCtx;
+    this.inheritTableSpecs = o.inheritTableSpecs;
+    this.writeType = o.writeType;
+    this.table = o.table;
+    this.partitionSpec = o.partitionSpec;
+  }
+
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,

http://git-wip-us.apache.org/repos/asf/hive/blob/c0978844/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 9f498c7..8ce211f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -83,6 +83,16 @@ public class MoveWork implements Serializable {
     this.checkFileFormat = checkFileFormat;
   }
 
+  public MoveWork(final MoveWork o) {
+    loadTableWork = o.getLoadTableWork();
+    loadFileWork = o.getLoadFileWork();
+    loadMultiFilesWork = o.getLoadMultiFilesWork();
+    checkFileFormat = o.getCheckFileFormat();
+    srcLocal = o.isSrcLocal();
+    inputs = o.getInputs();
+    outputs = o.getOutputs();
+  }
+
   @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED
})
   public LoadTableDesc getLoadTableWork() {
     return loadTableWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/c0978844/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
index e6ec445..68ccda9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
@@ -25,10 +25,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.*;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,9 +35,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.when;
 
 public class TestGenMapRedUtilsCreateConditionalTask {
@@ -59,6 +56,98 @@ public class TestGenMapRedUtilsCreateConditionalTask {
   }
 
   @Test
+  public void testMovePathsThatCannotBeMerged() {
+    final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+    final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
+    final MoveWork mockWork = mock(MoveWork.class);
+
+    assertFalse("A MoveWork null object cannot be merged.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, null));
+
+    hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "false");
+    assertFalse("Merging paths is not allowed when BlobStorage optimizations are disabled.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+    // Enable BlobStore optimizations for the rest of tests
+    hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "true");
+
+    reset(mockWork);
+    when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc());
+    assertFalse("Merging paths is not allowed when MultiFileWork is found in the MoveWork
object.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+    reset(mockWork);
+    when(mockWork.getLoadFileWork()).thenReturn(mock(LoadFileDesc.class));
+    when(mockWork.getLoadTableWork()).thenReturn(mock(LoadTableDesc.class));
+    assertFalse("Merging paths is not allowed when both LoadFileWork & LoadTableWork
are found in the MoveWork object.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+    reset(mockWork);
+    when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condInputPath, condOutputPath,
false, "", ""));
+    assertFalse("Merging paths is not allowed when both conditional output path is not equals
to MoveWork input path.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+    reset(mockWork);
+    when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, new Path("unused"),
false, "", ""));
+    assertFalse("Merging paths is not allowed when conditional input path is not a BlobStore
path.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, new Path("hdfs://hdfs-path"), condOutputPath,
mockWork));
+
+    reset(mockWork);
+    when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, new Path("hdfs://hdfs-path"),
false, "", ""));
+    assertFalse("Merging paths is not allowed when MoveWork output path is not a BlobStore
path.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+  }
+
+  @Test
+  public void testMovePathsThatCanBeMerged() {
+    final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+    final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
+    final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003");
+    final MoveWork mockWork = mock(MoveWork.class);
+
+    when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, targetMoveWorkPath,
false, "", ""));
+
+    assertTrue("Merging BlobStore paths should be allowed.",
+        GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMergePathWithInvalidMoveWorkThrowsException() {
+    final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+    final MoveWork mockWork = mock(MoveWork.class);
+
+    when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc());
+    GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+  }
+
+  @Test
+  public void testMergePathValidMoveWorkReturnsNewMoveWork() {
+    final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+    final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
+    final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003");
+    final MoveWork mockWork = mock(MoveWork.class);
+    MoveWork newWork;
+
+    // test using loadFileWork
+    when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, targetMoveWorkPath,
false, "", ""));
+    newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+    assertNotNull(newWork);
+    assertNotEquals(newWork, mockWork);
+    assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath());
+    assertEquals(targetMoveWorkPath, newWork.getLoadFileWork().getTargetDir());
+
+    // test using loadTableWork
+    TableDesc tableDesc = new TableDesc();
+    reset(mockWork);
+    when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc(condOutputPath, tableDesc,
null));
+    newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+    assertNotNull(newWork);
+    assertNotEquals(newWork, mockWork);
+    assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath());
+    assertTrue(newWork.getLoadTableWork().getTable().equals(tableDesc));
+  }
+
+  @Test
   public void testConditionalMoveTaskIsOptimized() throws SemanticException {
     hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "true");
 
@@ -91,10 +180,11 @@ public class TestGenMapRedUtilsCreateConditionalTask {
     assertEquals(1, mergeOnlyTask.getChildTasks().size());
     verifyMoveTask(mergeOnlyTask.getChildTasks().get(0), finalDirName, tableLocation);
 
-    // Verify mergeAndMoveTask is optimized
+    // Verify mergeAndMoveTask is NOT optimized
     assertEquals(1, mergeAndMoveTask.getChildTasks().size());
-    assertNull(mergeAndMoveTask.getChildTasks().get(0).getChildTasks());
-    verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0), sinkDirName, tableLocation);
+    assertEquals(1, mergeAndMoveTask.getChildTasks().get(0).getChildTasks().size());
+    verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0), sinkDirName, finalDirName);
+    verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0).getChildTasks().get(0), finalDirName,
tableLocation);
   }
 
   @Test


Mime
View raw message