hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [02/34] hive git commit: HIVE-11777 : implement an option to have single ETL strategy for multiple directories (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Wed, 18 Nov 2015 22:40:44 GMT
HIVE-11777 : implement an option to have single ETL strategy for multiple directories (Sergey
Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1b5b84a1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1b5b84a1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1b5b84a1

Branch: refs/heads/master-fixed
Commit: 1b5b84a18f846e6145254cea38e633fcb7c0cb37
Parents: 6ee4726
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Tue Nov 17 14:56:39 2015 -0800
Committer: Owen O'Malley <omalley@apache.org>
Committed: Wed Nov 18 13:56:13 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 189 ++++++++++++++++---
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  94 +++++++--
 3 files changed, 241 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1b5b84a1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 838f25c..953e52c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1089,6 +1089,10 @@ public class HiveConf extends Configuration {
     HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false,
         "If turned on splits generated by orc will include metadata about the stripes in
the file. This\n" +
         "data is read remotely (from the client or HS2 machine) and sent to all the tasks."),
+    HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS("hive.orc.splits.directory.batch.ms", 0,
+        "How long, in ms, to wait to batch input directories for processing during ORC split\n"
+
+        "generation. 0 means process directories individually. This can increase the number
of\n" +
+        "metastore calls if metastore metadata cache is used."),
     HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
         "Include file ID in splits on file systems thaty support it."),
     HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,

http://git-wip-us.apache.org/repos/asf/hive/blob/1b5b84a1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index bee0831..46862da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.codec.binary.Hex;
@@ -468,6 +469,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private static MetastoreCache metaCache;
     private static ExecutorService threadPool = null;
     private final int numBuckets;
+    private final int splitStrategyBatchMs;
     private final long maxSize;
     private final long minSize;
     private final int minSplits;
@@ -499,6 +501,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
       numBuckets =
           Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0);
+      splitStrategyBatchMs = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS);
       LOG.debug("Number of buckets specified by conf file is " + numBuckets);
       int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
           ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
@@ -610,21 +613,34 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
    * (split generation reads and caches file footers).
    */
   static final class ETLSplitStrategy implements SplitStrategy<SplitInfo>, Callable<Void>
{
+    private static final int ETL_COMBINE_FILE_LIMIT = 500;
+
+    private static class ETLDir {
+      public ETLDir(Path dir, FileSystem fs, int fileCount) {
+        this.dir = dir;
+        this.fs = fs;
+        this.fileCount = fileCount;
+      }
+      private final int fileCount;
+      private final Path dir;
+      private final FileSystem fs;
+    }
+
+
     Context context;
-    FileSystem fs;
+    List<ETLDir> dirs;
     List<HdfsFileStatusWithId> files;
     boolean isOriginal;
     List<DeltaMetaData> deltas;
-    Path dir;
     boolean[] covered;
     private List<Future<List<OrcSplit>>> splitFuturesRef;
 
     public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData>
deltas,
         boolean[] covered) {
+      assert !children.isEmpty();
       this.context = context;
-      this.dir = dir;
-      this.fs = fs;
+      this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size()));
       this.files = children;
       this.isOriginal = isOriginal;
       this.deltas = deltas;
@@ -634,19 +650,18 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     @Override
     public List<SplitInfo> getSplits() throws IOException {
       List<SplitInfo> result = new ArrayList<>(files.size());
-      // TODO: Right now, we do the metastore call here, so there will be a metastore call
per
-      //       partition. If we had a sync point after getting file lists, we could make
just one
-      //       call; this might be too much sync for many partitions and also cause issues
with the
-      //       huge metastore call result that cannot be handled with in-API batching. To
have an
-      //       optimal number of metastore calls, we should wait for batch-size number of
files (a
-      //       few hundreds) to become available, then call metastore.
-
       // Force local cache if we have deltas.
       FooterCache cache = context.cacheStripeDetails ?
           (deltas == null ? context.footerCache : Context.localCache) : null;
       if (cache != null) {
         FileInfo[] infos = cache.getAndValidate(files);
+        int dirIx = -1, fileInDirIx = -1, filesInDirCount = 0;
+        ETLDir dir = null;
         for (int i = 0; i < files.size(); ++i) {
+          if ((++fileInDirIx) == filesInDirCount) {
+            dir = dirs.get(++dirIx);
+            filesInDirCount = dir.fileCount;
+          }
           FileInfo info = infos[i];
           if (info != null) {
             // Cached copy is valid
@@ -656,15 +671,21 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
           // ignore files of 0 length
           if (file.getFileStatus().getLen() > 0) {
             result.add(new SplitInfo(
-                context, fs, file, info, isOriginal, deltas, true, dir, covered));
+                context, dir.fs, file, info, isOriginal, deltas, true, dir.dir, covered));
           }
         }
       } else {
+        int dirIx = -1, fileInDirIx = -1, filesInDirCount = 0;
+        ETLDir dir = null;
         for (HdfsFileStatusWithId file : files) {
+          if ((++fileInDirIx) == filesInDirCount) {
+            dir = dirs.get(++dirIx);
+            filesInDirCount = dir.fileCount;
+          }
           // ignore files of 0 length
           if (file.getFileStatus().getLen() > 0) {
             result.add(new SplitInfo(
-                context, fs, file, null, isOriginal, deltas, true, dir, covered));
+                context, dir.fs, file, null, isOriginal, deltas, true, dir.dir, covered));
           }
         }
       }
@@ -673,7 +694,39 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     @Override
     public String toString() {
-      return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
+      if (dirs.size() == 1) {
+        return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dirs.get(0).dir;
+      } else {
+        StringBuilder sb = new StringBuilder(ETLSplitStrategy.class.getSimpleName()
+            + " strategy for ");
+        boolean isFirst = true;
+        for (ETLDir dir : dirs) {
+          if (!isFirst) sb.append(", ");
+          isFirst = false;
+          sb.append(dir.dir);
+        }
+        return sb.toString();
+      }
+    }
+
+    enum CombineResult {
+      YES, // Combined, all good.
+      NO_AND_CONTINUE, // Don't combine with that, but may combine with others.
+      NO_AND_SWAP // Don't combine with with that, and make that a base for new combines.
+      // We may add NO_AND_STOP in future where combine is impossible and other should not
be base.
+    }
+
+    public CombineResult combineWith(FileSystem fs, Path dir,
+        List<HdfsFileStatusWithId> otherFiles, boolean isOriginal) {
+      if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT
+        || this.isOriginal != isOriginal) {
+        return (files.size() > otherFiles.size())
+            ? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE;
+      }
+      // All good, combine the base/original only ETL strategies.
+      files.addAll(otherFiles);
+      dirs.add(new ETLDir(dir, fs, otherFiles.size()));
+      return CombineResult.YES;
     }
 
     public Future<Void> generateSplitWork(
@@ -926,7 +979,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         hosts = start.getHosts();
       } else {
         Map.Entry<Long, BlockLocation> endEntry = locations.floorEntry(offset + length);
-        BlockLocation end = endEntry.getValue();
         //get the submap
         NavigableMap<Long, BlockLocation> navigableMap = locations.subMap(startEntry.getKey(),
                   true, endEntry.getKey(), true);
@@ -1115,6 +1167,13 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     return generateSplitsInfo(conf, -1);
   }
 
+  /** Class intended to update two values from methods... Java-related cruft. */
+  @VisibleForTesting
+  static final class CombinedCtx {
+    ETLSplitStrategy combined;
+    long combineStartUs;
+  }
+
   static List<OrcSplit> generateSplitsInfo(Configuration conf, int numSplits)
       throws IOException {
     // Use threads to resolve directories into splits.
@@ -1143,23 +1202,43 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
 
     // complete path futures and schedule split generation
     try {
-      for (int notIndex = 0; notIndex < paths.length; ++notIndex) {
-        AcidDirInfo adi = ecs.take().get();
-        SplitStrategy<?> splitStrategy = determineSplitStrategy(
-            context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+      CombinedCtx combinedCtx = (context.splitStrategyBatchMs > 0) ? new CombinedCtx()
: null;
+      long maxWaitUs = context.splitStrategyBatchMs * 1000000;
+      int resultsLeft = paths.length;
+      while (resultsLeft > 0) {
+        AcidDirInfo adi = null;
+        if (combinedCtx != null && combinedCtx.combined != null) {
+          long waitTimeUs = combinedCtx.combineStartUs + maxWaitUs - System.nanoTime();
+          if (waitTimeUs >= 0) {
+            Future<AcidDirInfo> f = ecs.poll(waitTimeUs, TimeUnit.NANOSECONDS);
+            adi = (f == null) ? null : f.get();
+          }
+        } else {
+          adi = ecs.take().get();
+        }
+
+        if (adi == null) {
+          // We were combining SS-es and the time has expired.
+          assert combinedCtx.combined != null;
+          scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures);
+          combinedCtx.combined = null;
+          continue;
+        }
+
+        // We have received a new directory information, make a split strategy.
+        --resultsLeft;
+        SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context,
+            adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+        if (splitStrategy == null) continue; // Combined.
 
         if (isDebugEnabled) {
-          LOG.debug("Split strategy: ", splitStrategy);
+          LOG.debug("Split strategy: {}", splitStrategy);
         }
 
         // Hack note - different split strategies return differently typed lists, yay Java.
         // This works purely by magic, because we know which strategy produces which type.
         if (splitStrategy instanceof ETLSplitStrategy) {
-          Future<Void> ssFuture = ((ETLSplitStrategy)splitStrategy).generateSplitWork(
-              context, splitFutures);
-          if (ssFuture != null) {
-            strategyFutures.add(ssFuture);
-          }
+          scheduleSplits((ETLSplitStrategy)splitStrategy, context, splitFutures, strategyFutures);
         } else {
           @SuppressWarnings("unchecked")
           List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
@@ -1167,6 +1246,12 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
         }
       }
 
+      // Run the last combined strategy, if any.
+      if (combinedCtx != null && combinedCtx.combined != null) {
+        scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures);
+        combinedCtx.combined = null;
+      }
+
       // complete split futures
       for (Future<Void> ssFuture : strategyFutures) {
          ssFuture.get(); // Make sure we get exceptions strategies might have thrown.
@@ -1196,12 +1281,49 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     return splits;
   }
 
+  private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context context,
+      List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>>
strategyFutures)
+          throws IOException {
+    Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures);
+    if (ssFuture == null) return;
+    strategyFutures.add(ssFuture);
+  }
+
   private static <T> void cancelFutures(List<Future<T>> futures) {
     for (Future<T> future : futures) {
       future.cancel(true);
     }
   }
 
+  private static SplitStrategy<?> combineOrCreateETLStrategy(CombinedCtx combinedCtx,
+      Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> files,
+      List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal) {
+    if (!deltas.isEmpty() || combinedCtx == null) {
+      return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered);
+    } else if (combinedCtx.combined == null) {
+      combinedCtx.combined = new ETLSplitStrategy(
+          context, fs, dir, files, isOriginal, deltas, covered);
+      combinedCtx.combineStartUs = System.nanoTime();
+      return null;
+    } else {
+      ETLSplitStrategy.CombineResult r =
+          combinedCtx.combined.combineWith(fs, dir, files, isOriginal);
+      switch (r) {
+      case YES: return null;
+      case NO_AND_CONTINUE:
+        return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered);
+      case NO_AND_SWAP: {
+        ETLSplitStrategy oldBase = combinedCtx.combined;
+        combinedCtx.combined = new ETLSplitStrategy(
+            context, fs, dir, files, isOriginal, deltas, covered);
+        combinedCtx.combineStartUs = System.nanoTime();
+        return oldBase;
+      }
+      default: throw new AssertionError("Unknown result " + r);
+      }
+    }
+  }
+
   @Override
   public InputSplit[] getSplits(JobConf job,
                                 int numSplits) throws IOException {
@@ -1532,8 +1654,9 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
   }
 
   @VisibleForTesting
-  static SplitStrategy determineSplitStrategy(Context context, FileSystem fs, Path dir,
-      AcidUtils.Directory dirInfo, List<HdfsFileStatusWithId> baseOrOriginalFiles)
{
+  static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
+      FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+      List<HdfsFileStatusWithId> baseOrOriginalFiles) {
     Path base = dirInfo.getBaseDirectory();
     List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
     List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
@@ -1561,16 +1684,20 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
       switch(context.splitStrategyKind) {
         case BI:
           // BI strategy requested through config
-          return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal, deltas,
covered);
+          return new BISplitStrategy(
+              context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
         case ETL:
           // ETL strategy requested through config
-          return new ETLSplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal,
deltas, covered);
+          return combineOrCreateETLStrategy(combinedCtx, context, fs,
+            dir, baseOrOriginalFiles, deltas, covered, isOriginal);
         default:
           // HYBRID strategy
           if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
-            return new ETLSplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal,
deltas, covered);
+            return combineOrCreateETLStrategy(combinedCtx, context, fs,
+                dir, baseOrOriginalFiles, deltas, covered, isOriginal);
           } else {
-            return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, isOriginal,
deltas, covered);
+            return new BISplitStrategy(
+                context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
           }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/1b5b84a1/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index c61d615..ec90481 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -17,9 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -109,6 +107,7 @@ import org.junit.rules.TestName;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Output;
 
+@SuppressWarnings({ "deprecation", "unchecked", "rawtypes" })
 public class TestInputOutputFormat {
 
   public static String toKryo(SearchArgument sarg) {
@@ -524,14 +523,90 @@ public class TestInputOutputFormat {
             new MockPath(fs, "mock:/a/b"), false);
     splitStrategy = createSplitStrategy(context, gen);
     assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
+  }
+
+  @Test
+  public void testEtlCombinedStrategy() throws Exception {
+    conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
+    conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000");
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/1/part-00", 1000, new byte[0]),
+        new MockFile("mock:/a/1/part-01", 1000, new byte[0]),
+        new MockFile("mock:/a/2/part-00", 1000, new byte[0]),
+        new MockFile("mock:/a/2/part-01", 1000, new byte[0]),
+        new MockFile("mock:/a/3/base_0/1", 1000, new byte[0]),
+        new MockFile("mock:/a/4/base_0/1", 1000, new byte[0]),
+        new MockFile("mock:/a/5/base_0/1", 1000, new byte[0]),
+        new MockFile("mock:/a/5/delta_0_25/1", 1000, new byte[0])
+    );
+
+    OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx();
+    // The first directory becomes the base for combining.
+    SplitStrategy<?> ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
+    assertNull(ss);
+    assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
+    OrcInputFormat.ETLSplitStrategy etlSs = (OrcInputFormat.ETLSplitStrategy)combineCtx.combined;
+    assertEquals(2, etlSs.files.size());
+    assertTrue(etlSs.isOriginal);
+    assertEquals(1, etlSs.dirs.size());
+    // The second one should be combined into the first.
+    ss = createOrCombineStrategy(context, fs, "mock:/a/2", combineCtx);
+    assertNull(ss);
+    assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
+    assertEquals(4, etlSs.files.size());
+    assertEquals(2, etlSs.dirs.size());
+    // The third one has the base file, so it shouldn't be combined but could be a base.
+    ss = createOrCombineStrategy(context, fs, "mock:/a/3", combineCtx);
+    assertSame(etlSs, ss);
+    assertEquals(4, etlSs.files.size());
+    assertEquals(2, etlSs.dirs.size());
+    assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
+    etlSs = (OrcInputFormat.ETLSplitStrategy)combineCtx.combined;
+    assertEquals(1, etlSs.files.size());
+    assertFalse(etlSs.isOriginal);
+    assertEquals(1, etlSs.dirs.size());
+    // Try the first again, it would not be combined and we'd retain the old base (less files).
+    ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
+    assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
+    assertNotSame(etlSs, ss);
+    OrcInputFormat.ETLSplitStrategy rejectedEtlSs = (OrcInputFormat.ETLSplitStrategy)ss;
+    assertEquals(2, rejectedEtlSs.files.size());
+    assertEquals(1, rejectedEtlSs.dirs.size());
+    assertTrue(rejectedEtlSs.isOriginal);
+    assertEquals(1, etlSs.files.size());
+    assertEquals(1, etlSs.dirs.size());
+    // The fourth could be combined again.
+    ss = createOrCombineStrategy(context, fs, "mock:/a/4", combineCtx);
+    assertNull(ss);
+    assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
+    assertEquals(2, etlSs.files.size());
+    assertEquals(2, etlSs.dirs.size());
+    // The fifth will not be combined because of delta files.
+    ss = createOrCombineStrategy(context, fs, "mock:/a/5", combineCtx);
+    assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
+    assertNotSame(etlSs, ss);
+    assertEquals(2, etlSs.files.size());
+    assertEquals(2, etlSs.dirs.size());
+  }
+
+  public SplitStrategy<?> createOrCombineStrategy(OrcInputFormat.Context context,
+      MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException
{
+    OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
+    return OrcInputFormat.determineSplitStrategy(
+        combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+  }
 
+  public OrcInputFormat.AcidDirInfo createAdi(
+      OrcInputFormat.Context context, MockFileSystem fs, String path) throws IOException
{
+    return new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, path), false).call();
   }
 
   private OrcInputFormat.SplitStrategy createSplitStrategy(
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException
{
     OrcInputFormat.AcidDirInfo adi = gen.call();
     return OrcInputFormat.determineSplitStrategy(
-        context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
   }
 
   public static class MockBlock {
@@ -694,7 +769,6 @@ public class TestInputOutputFormat {
     final List<MockFile> files = new ArrayList<MockFile>();
     Path workingDir = new Path("/");
 
-    @SuppressWarnings("unused")
     public MockFileSystem() {
       // empty
     }
@@ -1104,7 +1178,6 @@ public class TestInputOutputFormat {
   }
 
   @Test
-  @SuppressWarnings("unchecked,deprecation")
   public void testInOutFormat() throws Exception {
     Properties properties = new Properties();
     properties.setProperty("columns", "x,y");
@@ -1236,7 +1309,6 @@ public class TestInputOutputFormat {
   }
 
   @Test
-  @SuppressWarnings("unchecked,deprecation")
   public void testMROutput() throws Exception {
     Properties properties = new Properties();
     StructObjectInspector inspector;
@@ -1293,7 +1365,6 @@ public class TestInputOutputFormat {
   }
 
   @Test
-  @SuppressWarnings("deprecation")
   public void testEmptyFile() throws Exception {
     Properties properties = new Properties();
     properties.setProperty("columns", "x,y");
@@ -1350,7 +1421,6 @@ public class TestInputOutputFormat {
   }
 
   @Test
-  @SuppressWarnings("unchecked,deprecation")
   public void testDefaultTypes() throws Exception {
     Properties properties = new Properties();
     properties.setProperty("columns", "str,str2");
@@ -1517,7 +1587,6 @@ public class TestInputOutputFormat {
    * @throws Exception
    */
   @Test
-  @SuppressWarnings("unchecked")
   public void testVectorization() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
@@ -1565,7 +1634,6 @@ public class TestInputOutputFormat {
    * @throws Exception
    */
   @Test
-  @SuppressWarnings("unchecked")
   public void testVectorizationWithBuckets() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
@@ -1611,7 +1679,6 @@ public class TestInputOutputFormat {
 
   // test acid with vectorization, no combine
   @Test
-  @SuppressWarnings("unchecked")
   public void testVectorizationWithAcid() throws Exception {
     StructObjectInspector inspector = new BigRowInspector();
     JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
@@ -1682,7 +1749,6 @@ public class TestInputOutputFormat {
 
   // test non-vectorized, non-acid, combine
   @Test
-  @SuppressWarnings("unchecked")
   public void testCombinationInputFormat() throws Exception {
     // get the object inspector for MyRow
     StructObjectInspector inspector;
@@ -1891,7 +1957,6 @@ public class TestInputOutputFormat {
   }
 
   @Test
-  @SuppressWarnings("unchecked,deprecation")
   public void testSplitElimination() throws Exception {
     Properties properties = new Properties();
     properties.setProperty("columns", "z,r");
@@ -1933,7 +1998,6 @@ public class TestInputOutputFormat {
   }
 
   @Test
-  @SuppressWarnings("unchecked,deprecation")
   public void testSplitEliminationNullStats() throws Exception {
     Properties properties = new Properties();
     StructObjectInspector inspector;


Mime
View raw message