hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject [1/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)
Date Wed, 16 Aug 2017 01:14:43 GMT
Repository: hive
Updated Branches:
  refs/heads/master 506168370 -> 34b0e07a3


http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index ba8d675..82cf108 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -18,11 +18,20 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.MemoryManager;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.MemoryManagerImpl;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +67,10 @@ import org.mockito.Mockito;
 import com.google.common.collect.Lists;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -283,28 +295,30 @@ public class TestOrcRawRecordMerger {
 
   @Test
   public void testOriginalReaderPair() throws Exception {
+    int BUCKET = 10;
     ReaderKey key = new ReaderKey();
+    Configuration conf = new Configuration();
+    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
     Reader reader = createMockOriginalReader();
-    RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
-    RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
+    RecordIdentifier minKey = new RecordIdentifier(0, bucketProperty, 1);
+    RecordIdentifier maxKey = new RecordIdentifier(0, bucketProperty, 3);
     boolean[] includes = new boolean[]{true, true};
-    Configuration conf = new Configuration();
     FileSystem fs = FileSystem.getLocal(conf);
     Path root = new Path(tmpDir, "testOriginalReaderPair");
     fs.makeQualified(root);
     fs.create(root);
-    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, minKey, maxKey,
+    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, minKey, maxKey,
         new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
     RecordReader recordReader = pair.getRecordReader();
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
     assertEquals("third", value(pair.nextRecord()));
 
     pair.next(pair.nextRecord());
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
     assertEquals("fourth", value(pair.nextRecord()));
@@ -320,46 +334,48 @@ public class TestOrcRawRecordMerger {
 
   @Test
   public void testOriginalReaderPairNoMin() throws Exception {
+    int BUCKET = 10;
     ReaderKey key = new ReaderKey();
     Reader reader = createMockOriginalReader();
     Configuration conf = new Configuration();
+    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
     FileSystem fs = FileSystem.getLocal(conf);
     Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
     fs.makeQualified(root);
     fs.create(root);
-    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, null, null,
+    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, null, null,
         new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
     assertEquals("first", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(0, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("second", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(1, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("third", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("fourth", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("fifth", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(4, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
@@ -506,6 +522,53 @@ public class TestOrcRawRecordMerger {
     return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  /**
+   * {@link org.apache.hive.hcatalog.streaming.TestStreaming#testInterleavedTransactionBatchCommits} has more tests
+   */
+  @Test
+  public void testGetLogicalLength() throws Exception {
+    final int BUCKET = 0;
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    OrcOutputFormat of = new OrcOutputFormat();
+    Path root = new Path(tmpDir, "testEmpty").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+        (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    /*create delta_1_1_0/bucket0 with 1 row and close the file*/
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+      .inspector(inspector).bucket(BUCKET).writingBase(false).minimumTransactionId(1)
+      .maximumTransactionId(1).finalDestination(root);
+    Path delta1_1_0 = new Path(root, AcidUtils.deltaSubdir(
+      options.getMinimumTransactionId(), options.getMaximumTransactionId(), options.getStatementId()));
+    Path bucket0 = AcidUtils.createBucketFile(delta1_1_0, BUCKET);
+    Path bucket0SideFile = OrcAcidUtils.getSideFile(bucket0);
+
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    ru.insert(options.getMaximumTransactionId(), new MyRow("first"));
+    ru.close(false);
+
+    FileStatus bucket0File = fs.getFileStatus(bucket0);
+    AcidUtils.getLogicalLength(fs, bucket0File);
+    Assert.assertTrue("no " + bucket0, fs.exists(bucket0));
+    Assert.assertFalse("unexpected " + bucket0SideFile, fs.exists(bucket0SideFile));
+    //test getLogicalLength() w/o side file
+    Assert.assertEquals("closed file size mismatch", bucket0File.getLen(),
+      AcidUtils.getLogicalLength(fs, bucket0File));
+
+    //create an empty (invalid) side file - make sure getLogicalLength() throws
+    FSDataOutputStream flushLengths = fs.create(bucket0SideFile, true, 8);
+    flushLengths.close();
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage(bucket0SideFile.getName() + " found but is not readable");
+    AcidUtils.getLogicalLength(fs, bucket0File);
+  }
   @Test
   public void testEmpty() throws Exception {
     final int BUCKET = 0;
@@ -525,7 +588,16 @@ public class TestOrcRawRecordMerger {
         .inspector(inspector).bucket(BUCKET).writingBase(true)
         .maximumTransactionId(100).finalDestination(root);
     of.getRecordUpdater(root, options).close(false);
-
+    {
+      /*OrcRecordUpdater is inconsistent about when it creates empty files and when it does not.
+      This creates an empty bucket. HIVE-17138*/
+      OrcFile.WriterOptions wo = OrcFile.writerOptions(conf);
+      wo.inspector(inspector);
+      wo.callback(new OrcRecordUpdater.KeyIndexBuilder("testEmpty"));
+      Writer w = OrcFile.createWriter(AcidUtils.createBucketFile(new Path(root,
+        AcidUtils.baseDir(100)), BUCKET), wo);
+      w.close();
+    }
     ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
 
@@ -600,52 +672,58 @@ public class TestOrcRawRecordMerger {
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
     assertEquals(new Path(root, use130Format ?
-        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+        AcidUtils.deleteDeltaSubdir(200,200,0) : AcidUtils.deleteDeltaSubdir(200,200)),
         directory.getCurrentDirectories().get(0).getPath());
+    assertEquals(new Path(root, use130Format ?
+        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+      directory.getCurrentDirectories().get(1).getPath());
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
         BUCKET);
+    Path deltaPath = AcidUtils.createBucketFile(directory.getCurrentDirectories().get(1).getPath(),
+      BUCKET);
+    Path deleteDeltaDir = directory.getCurrentDirectories().get(0).getPath();
 
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(conf,true);
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
 
+    //the first "split" is for base/
     Reader baseReader = OrcFile.createReader(basePath,
         OrcFile.readerOptions(conf));
     OrcRawRecordMerger merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
+            new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
     assertEquals(null, merger.getMinKey());
     assertEquals(null, merger.getMaxKey());
     RecordIdentifier id = merger.createKey();
     OrcStruct event = merger.createValue();
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
-    assertEquals("update 1", getValue(event));
-    assertFalse(merger.isDelete(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
     assertEquals("second", getValue(event));
-    assertFalse(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
-    assertEquals("update 2", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
-    assertEquals("update 3", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -667,14 +745,13 @@ public class TestOrcRawRecordMerger {
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
-    assertTrue(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
@@ -687,109 +764,205 @@ public class TestOrcRawRecordMerger {
     assertEquals(false, merger.next(id, event));
     merger.close();
 
-    // make a merger that doesn't collapse events
-    merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
-            createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
+    //second "split" is delta_200_200
+    baseReader = OrcFile.createReader(deltaPath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+        createMaximalTxnList(), new Reader.Options(),
+        new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
     assertEquals("update 1", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    //now run as if it's a minor Compaction so we don't collapse events
+    //it's not exactly like minor compaction since MC would not have a baseReader
+    //here there is only 1 "split" since we only have data for 1 bucket
+    baseReader = OrcFile.createReader(basePath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+        createMaximalTxnList(), new Reader.Options(),
+        AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id);
     assertEquals("first", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
     assertEquals("second", getValue(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
-    assertEquals("update 2", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id);
     assertEquals("third", getValue(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
-    assertEquals("update 3", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id);
     assertEquals("fourth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
     assertEquals("fifth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
     assertEquals("sixth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
     assertEquals("seventh", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id);
     assertEquals("eighth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
+
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id);
     assertEquals("ninth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
     assertEquals("tenth", getValue(event));
 
+    //data from delta_200_200
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
+    assertEquals("update 1", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+    assertEquals("update 3", getValue(event));
+
     assertEquals(false, merger.next(id, event));
     merger.close();
 
+
     // try ignoring the 200 transaction and make sure it works still
     ValidTxnList txns = new ValidReadTxnList("2000:200:200");
+    //again 1st split is for base/
+    baseReader = OrcFile.createReader(basePath,
+      OrcFile.readerOptions(conf));
     merger =
-        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
-            txns, new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
+      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+        txns, new Reader.Options(),
+        new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
+
     for(int i=0; i < values.length; ++i) {
       assertEquals(true, merger.next(id, event));
       LOG.info("id = " + id + "event = " + event);
@@ -798,7 +971,19 @@ public class TestOrcRawRecordMerger {
       assertEquals(new ReaderKey(0, BUCKET_PROPERTY, i, 0), id);
       assertEquals(values[i], getValue(event));
     }
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    // 2nd split is for delta_200_200 which is filtered out entirely by "txns"
+    baseReader = OrcFile.createReader(deltaPath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+        txns, new Reader.Options(),
+        new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
 
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
     assertEquals(false, merger.next(id, event));
     merger.close();
   }
@@ -846,6 +1031,7 @@ public class TestOrcRawRecordMerger {
    * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
    * a base and a delta.
    * @throws Exception
+   * @see #testRecordReaderNewBaseAndDelta()
    */
   @Test
   public void testRecordReaderOldBaseAndDelta() throws Exception {
@@ -891,63 +1077,90 @@ public class TestOrcRawRecordMerger {
         .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
         .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5)
         .finalDestination(root);
+
+    final int BUCKET_PROPERTY = BucketCodec.V1.encode(options);
+
     RecordUpdater ru = of.getRecordUpdater(root, options);
     values = new String[]{"0.0", null, null, "1.1", null, null, null,
         "ignore.7"};
     for(int i=0; i < values.length; ++i) {
       if (values[i] != null) {
-        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(9, 0, BUCKET));
-    ru.close(false);
+    ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY));
+    ru.close(false);//this doesn't create a key index presumably because writerOptions are not set on 'options'
 
     // write a delta
-    options = options.minimumTransactionId(2).maximumTransactionId(2);
+    options = options.minimumTransactionId(100).maximumTransactionId(100);
     ru = of.getRecordUpdater(root, options);
     values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
-    for(int i=0; i < values.length; ++i) {
+    for(int i=0; i < values.length - 1; ++i) {
       if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+        ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(8, 0, BUCKET));
+    //do this before next update so that delte_delta is properly sorted
+    ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
+    //because row 8 was updated and thus has a different RecordIdentifier now
+    ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY));
+
     ru.close(false);
 
+    MyResult[] expected = new MyResult[10];
+    int k = 0;
+    expected[k++] = new MyResult(0, "0.0");
+    expected[k++] = new MyResult(1, "0.1");
+    expected[k++] = new MyResult(2, "1.0");
+    expected[k++] = new MyResult(3, "1.1");
+    expected[k++] = new MyResult(4, "2.0");
+    expected[k++] = new MyResult(5, "2.1");
+    expected[k++] = new MyResult(6, "3.0");
+    expected[k] = new MyResult(7, "3.1");
+
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set("mapred.min.split.size", "1");
     job.set("mapred.max.split.size", "2");
     job.set("mapred.input.dir", root.toString());
     InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(5, splits.length);
+    assertEquals(7, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
 
-    // loop through the 5 splits and read each
-    for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i);
-      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+    for(InputSplit split : splits) {
+      rr = inf.getRecordReader(split, job, Reporter.NULL);
       NullWritable key = rr.createKey();
       OrcStruct value = rr.createValue();
-
-      // there should be exactly two rows per a split
-      for(int j=0; j < 2; ++j) {
-        System.out.println("i = " + i + ", j = " + j);
-        assertEquals(true, rr.next(key, value));
-        System.out.println("record = " + value);
-        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      while(rr.next(key, value)) {
+        MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString());
+        int i = 0;
+        for(; i < expected.length; i++) {
+          if(mr.equals(expected[i])) {
+            expected[i] = null;
+            break;
+          }
+        }
+        if(i >= expected.length) {
+          //not found
+          assertTrue("Found unexpected row: " + mr, false );
+        }
       }
-      assertEquals(false, rr.next(key, value));
     }
-    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
-    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+    for(MyResult mr : expected) {
+      assertTrue("Expected " + mr + " not found in any InputSplit", mr == null);
+    }
   }
 
   /**
    * Test the RecordReader when there is a new base and a delta.
+   * This test creates multiple stripes in both base and delta files which affects how many splits
+   * are created on read.  With ORC-228 this could be done in E2E fashion with a query or
+   * streaming ingest writing data.
+   * @see #testRecordReaderOldBaseAndDelta()
    * @throws Exception
    */
   @Test
@@ -1009,20 +1222,33 @@ public class TestOrcRawRecordMerger {
         ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(9, 0, BUCKET_PROPERTY));
+    ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY));
     ru.close(false);
 
     // write a delta
-    options.minimumTransactionId(2).maximumTransactionId(2);
+    options.minimumTransactionId(100).maximumTransactionId(100);
     ru = of.getRecordUpdater(root, options);
     values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
-    for(int i=0; i < values.length; ++i) {
+    for(int i=0; i < values.length - 1; ++i) {
       if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
+        ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
+    //do this before next update so that delte_delta is properly sorted
     ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
+    //because row 8 was updated and thus has a different RecordIdentifier now
+    ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY));
     ru.close(false);
+    MyResult[] expected = new MyResult[10];
+    int k = 0;
+    expected[k++] = new MyResult(0, "0.0");
+    expected[k++] = new MyResult(1, "0.1");
+    expected[k++] = new MyResult(2, "1.0");
+    expected[k++] = new MyResult(3, "1.1");
+    expected[k++] = new MyResult(4, "2.0");
+    expected[k++] = new MyResult(5, "2.1");
+    expected[k++] = new MyResult(6, "3.0");
+    expected[k] = new MyResult(7, "3.1");
 
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
@@ -1031,31 +1257,56 @@ public class TestOrcRawRecordMerger {
     job.set("mapred.input.dir", root.toString());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(5, splits.length);
+    //base has 10 rows, so 5 splits, 1 delta has 2 rows so 1 split, and 1 delta has 3 so 2 splits
+    assertEquals(8, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
 
-    // loop through the 5 splits and read each
-    for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i + " = " + splits[i]);
-      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+    for(InputSplit split : splits) {
+      rr = inf.getRecordReader(split, job, Reporter.NULL);
       NullWritable key = rr.createKey();
       OrcStruct value = rr.createValue();
-
-      // there should be exactly two rows per a split
-      for(int j=0; j < 2; ++j) {
-        System.out.println("i = " + i + ", j = " + j);
-        assertEquals(true, rr.next(key, value));
-        System.out.println("record = " + value);
-        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      while(rr.next(key, value)) {
+        MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString());
+        int i = 0;
+        for(; i < expected.length; i++) {
+          if(mr.equals(expected[i])) {
+            expected[i] = null;
+            break;
+          }
+        }
+        if(i >= expected.length) {
+          //not found
+          assertTrue("Found unexpected row: " + mr, false );
+        }
       }
-      assertEquals(false, rr.next(key, value));
     }
-    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
-    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+    for(MyResult mr : expected) {
+      assertTrue("Expected " + mr + " not found in any InputSplit", mr == null);
+    }
+  }
+  private static final class MyResult {
+    private final int myInt;
+    private final String myText;
+    MyResult(int myInt, String myText) {
+      this.myInt = myInt;
+      this.myText = myText;
+    }
+    @Override
+    public boolean equals(Object t) {
+      if(!(t instanceof MyResult)) {
+        return false;
+      }
+      MyResult that = (MyResult)t;
+      return myInt == that.myInt && myText.equals(that.myText);
+    }
+    @Override
+    public String toString() {
+      return "(" + myInt + "," + myText +")";
+    }
   }
-
   /**
    * Test the RecordReader when there is a new base and a delta.
    * @throws Exception
@@ -1081,18 +1332,17 @@ public class TestOrcRawRecordMerger {
             .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
           .finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
-    String[] values = new String[]{"a", "b", "c", "d", "e"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(1, new MyRow(values[i]));
+    String[][] values = {new String[]{"a", "b", "c", "d", "e"}, new String[]{"f", "g", "h", "i", "j"}};
+    for(int i=0; i < values[0].length; ++i) {
+      ru.insert(1, new MyRow(values[0][i]));
     }
     ru.close(false);
 
     // write a delta
     options.minimumTransactionId(2).maximumTransactionId(2);
     ru = of.getRecordUpdater(root, options);
-    values = new String[]{"f", "g", "h", "i", "j"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(2, new MyRow(values[i]));
+    for(int i=0; i < values[1].length; ++i) {
+      ru.insert(2, new MyRow(values[1][i]));
     }
     ru.close(false);
 
@@ -1104,19 +1354,23 @@ public class TestOrcRawRecordMerger {
     job.set("bucket_count", "1");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(1, splits.length);
+    assertEquals(2, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
-    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
-    values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
-    OrcStruct row = rr.createValue();
-    for(int i = 0; i < values.length; ++i) {
-      System.out.println("Checking " + i);
-      assertEquals(true, rr.next(NullWritable.get(), row));
-      assertEquals(values[i], row.getFieldValue(0).toString());
+    for(int j = 0; j < splits.length; j++) {
+      InputSplit split = splits[j];
+      rr = inf.getRecordReader(split, job, Reporter.NULL);
+      OrcStruct row = rr.createValue();
+      for (int i = 0; i < values[j].length; ++i) {
+        System.out.println("Checking " + i);
+        String msg = "split[" + j + "] at i=" + i;
+        assertEquals(msg, true, rr.next(NullWritable.get(), row));
+        assertEquals(msg, values[j][i], row.getFieldValue(0).toString());
+      }
+      assertEquals(false, rr.next(NullWritable.get(), row));
     }
-    assertEquals(false, rr.next(NullWritable.get(), row));
   }
 
   /**
@@ -1174,11 +1428,13 @@ public class TestOrcRawRecordMerger {
     job.set("bucket_count", "2");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
 
     // read the keys before the delta is flushed
     InputSplit[] splits = inf.getSplits(job, 1);
-    assertEquals(2, splits.length);
+    //1 split since we only have 1 bucket file in base/.  delta is not flushed (committed) yet, i.e. empty
+    assertEquals(1, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
         inf.getRecordReader(splits[0], job, Reporter.NULL);
     NullWritable key = rr.createKey();
@@ -1201,17 +1457,24 @@ public class TestOrcRawRecordMerger {
 
     splits = inf.getSplits(job, 1);
     assertEquals(2, splits.length);
-    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
     Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
       AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
     assertEquals(true, fs.exists(sideFile));
-    assertEquals(24, fs.getFileStatus(sideFile).getLen());
+    assertEquals(32, fs.getFileStatus(sideFile).getLen());
 
-    for(int i=1; i < 11; ++i) {
+    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+    for(int i=1; i <= 5; ++i) {
       assertEquals(true, rr.next(key, value));
       assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
     }
     assertEquals(false, rr.next(key, value));
+
+    rr = inf.getRecordReader(splits[1], job, Reporter.NULL);
+    for(int i=6; i < 11; ++i) {
+      assertEquals("i="+ i, true, rr.next(key, value));
+      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(key, value));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index be15517..e5c6458 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -125,6 +125,7 @@ public class TestOrcRecordUpdater {
     // read the stopping point for the first flush and make sure we only see
     // 3 rows
     long len = side.readLong();
+    len = side.readLong();
     Reader reader = OrcFile.createReader(bucketPath,
         new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
     assertEquals(3, reader.getNumberOfRows());
@@ -266,28 +267,51 @@ public class TestOrcRecordUpdater {
 
     Reader reader = OrcFile.createReader(bucketPath,
         new OrcFile.ReaderOptions(conf).filesystem(fs));
-    assertEquals(2, reader.getNumberOfRows());
+    assertEquals(1, reader.getNumberOfRows());
 
     RecordReader rows = reader.rows();
 
     // check the contents of the file
     assertEquals(true, rows.hasNext());
     OrcStruct row = (OrcStruct) rows.next(null);
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(row));
     assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
-    assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
-    assertEquals(20, OrcRecordUpdater.getBucket(row));
-    assertEquals(30, OrcRecordUpdater.getRowId(row));
+    assertEquals(100, OrcRecordUpdater.getOriginalTransaction(row));
+    int bucketProperty = OrcRecordUpdater.getBucket(row);
+    assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
+    assertEquals(0, OrcRecordUpdater.getRowId(row));
     assertEquals("update",
         OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    rows.close();
+
+    options.writingDeleteDelta(true);
+    bucketPath = AcidUtils.createFilename(root, options);
+    reader = OrcFile.createReader(bucketPath,
+      new OrcFile.ReaderOptions(conf).filesystem(fs));
+    assertEquals(2, reader.getNumberOfRows());
+
+    rows = reader.rows();
     assertEquals(true, rows.hasNext());
     row = (OrcStruct) rows.next(null);
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(row));
+    assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+    assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
+    bucketProperty = OrcRecordUpdater.getBucket(row);
+    assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
+    assertEquals(30, OrcRecordUpdater.getRowId(row));
+    assertNull(OrcRecordUpdater.getRow(row));
+
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(row));
     assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
     assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row));
-    assertEquals(20, OrcRecordUpdater.getBucket(row));
+    bucketProperty = OrcRecordUpdater.getBucket(row);
+    assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
     assertEquals(60, OrcRecordUpdater.getRowId(row));
     assertNull(OrcRecordUpdater.getRow(row));
+
     assertEquals(false, rows.hasNext());
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 439ec9b..43e0a4a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -246,10 +246,6 @@ public class TestVectorizedOrcAcidRowBatchReader {
   @Test
   public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
     OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
-    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
-        AcidUtils.AcidOperationalProperties.getLegacy().toInt());
-    // Test false when trying to create a vectorized ACID row batch reader for a legacy table.
-    assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
 
     conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
         AcidUtils.AcidOperationalProperties.getDefault().toInt());

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index bbed591..7455fa8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -409,6 +409,11 @@ public abstract class CompactorTest {
       return null;
     }
 
+    /**
+     * This is bogus especially with split update acid tables.  This causes compaction to create
+     * delete_delta_x_y where none existed before.  Makes the data layout such as would never be
+     * created by 'real' code path.
+     */
     @Override
     public boolean isDelete(Text value) {
       // Alternate between returning deleted and not.  This is easier than actually
@@ -552,4 +557,7 @@ public abstract class CompactorTest {
   String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
     return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
   }
+  String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
+    return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index efd6ed8..8d01543 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -289,7 +289,7 @@ public class TestWorker extends CompactorTest {
     CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
     txnHandler.compact(rqst);
 
-    startWorker();
+    startWorker();//adds delta and delete_delta
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -299,7 +299,7 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    Assert.assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -310,9 +310,19 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(2, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
-      } else {
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      else {
         LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
       }
     }
@@ -348,14 +358,15 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    Assert.assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
+    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
   }
 
   @Test
@@ -380,18 +391,19 @@ public class TestWorker extends CompactorTest {
     Assert.assertEquals(1, compacts.size());
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
 
-    // There should still now be 5 directories in the location
+    // There should still now be 6 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    Assert.assertEquals(6, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27), stat[1].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[3].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
   }
 
   @Test
@@ -409,7 +421,7 @@ public class TestWorker extends CompactorTest {
     rqst.setPartitionname("ds=today");
     txnHandler.compact(rqst);
 
-    startWorker();
+    startWorker();//this will create delta_20_24 and delete_delta_20_24. See MockRawReader
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -419,7 +431,7 @@ public class TestWorker extends CompactorTest {
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    Assert.assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -430,9 +442,18 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(2, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
-      } else {
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+          sawNewDelta = true;
+          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+          Assert.assertEquals(2, buckets.length);
+          Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+          Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+          Assert.assertEquals(104L, buckets[0].getLen());
+          Assert.assertEquals(104L, buckets[1].getLen());
+        } else {
         LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
       }
     }
@@ -462,7 +483,7 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(3, stat.length);
+    Assert.assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -473,8 +494,17 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(2, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
       }
@@ -534,6 +564,17 @@ public class TestWorker extends CompactorTest {
   public void majorNoBaseLotsOfDeltas() throws Exception {
     compactNoBaseLotsOfDeltas(CompactionType.MAJOR);
   }
+
+  /**
+   * These tests are starting to be a hack.  The files writtern by addDeltaFile() are not proper
+   * Acid files and the {@link CompactorTest.MockRawReader} performs no merging of delta files and
+   * fakes isDelete() as a shortcut.  This makes files created on disk to not be representative of
+   * what they should look like in a real system.
+   * Making {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorTest.MockRawReader} do proper
+   * delete event handling would be duplicating either OrcRawRecordMerger or VectorizedOrcAcidRowBatchReaer.
+   * @param type
+   * @throws Exception
+   */
   private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception {
     conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2);
     Table t = newTable("default", "mapwb", true);
@@ -570,45 +611,55 @@ public class TestWorker extends CompactorTest {
 
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(9, stat.length);
+    /* delete_delta_21_23 and delete_delta_25_33 which are created as a result of compacting*/
+    int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0);
+    Assert.assertEquals(numFilesExpected, stat.length);
 
     // Find the new delta file and make sure it has the right contents
-    BitSet matchesFound = new BitSet(9);
-    for (int i = 0; i < stat.length; i++) {
+    BitSet matchesFound = new BitSet(numFilesExpected);
+    for (int i = 0, j = 0; i < stat.length; i++) {
+      if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21,23))) {
+        matchesFound.set(j++);
+      }
+      if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(25,33))) {
+        matchesFound.set(j++);
+      }
       if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) {
-        matchesFound.set(0);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) {
-        matchesFound.set(1);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) {
-        matchesFound.set(2);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) {
-        matchesFound.set(3);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) {
-        matchesFound.set(4);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) {
-        matchesFound.set(5);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) {
-        matchesFound.set(6);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) {
-        matchesFound.set(7);
+        matchesFound.set(j++);
       }
       switch (type) {
-        //yes, both do set(8)
         case MINOR:
           if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) {
-            matchesFound.set(8);
+            matchesFound.set(j++);
+          }
+          if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 35))) {
+            matchesFound.set(j++);
           }
           break;
         case MAJOR:
           if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) {
-            matchesFound.set(8);
+            matchesFound.set(j++);
           }
           break;
         default:

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
index 18408e4..80ee0ba 100644
--- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
@@ -108,7 +108,7 @@ when matched then update set
   end_date = date '2017-03-15' 
 when not matched then insert values
   (sub.source_pk, upper(substr(sub.name, 0, 3)), sub.name, sub.state, true, null);
-select * from customer order by source_pk;
+select * from customer order by source_pk, is_current;
 
 drop table customer;
 drop table customer_updates;

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
index 78c9070..7f837cc 100644
--- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
+++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
@@ -981,11 +981,11 @@ POSTHOOK: Lineage: customer.sk EXPRESSION [(new_customer_stage)stage.FieldSchema
 POSTHOOK: Lineage: customer.source_pk SIMPLE [(new_customer_stage)stage.FieldSchema(name:source_pk, type:int, comment:null), ]
 POSTHOOK: Lineage: customer.state SIMPLE [(new_customer_stage)stage.FieldSchema(name:state, type:string, comment:null), ]
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(customer)customer.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), ]
-PREHOOK: query: select * from customer order by source_pk
+PREHOOK: query: select * from customer order by source_pk, is_current
 PREHOOK: type: QUERY
 PREHOOK: Input: type2_scd_helper@customer
 #### A masked pattern was here ####
-POSTHOOK: query: select * from customer order by source_pk
+POSTHOOK: query: select * from customer order by source_pk, is_current
 POSTHOOK: type: QUERY
 POSTHOOK: Input: type2_scd_helper@customer
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/results/clientpositive/row__id.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/row__id.q.out b/ql/src/test/results/clientpositive/row__id.q.out
index 059ace9..9aab097 100644
--- a/ql/src/test/results/clientpositive/row__id.q.out
+++ b/ql/src/test/results/clientpositive/row__id.q.out
@@ -56,23 +56,23 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: hello_acid
-            Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
             Select Operator
               expressions: ROW__ID.transactionid (type: bigint)
               outputColumnNames: _col0
-              Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+              Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
               Reduce Output Operator
                 key expressions: _col0 (type: bigint)
                 sort order: +
-                Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+                Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
       Reduce Operator Tree:
         Select Operator
           expressions: KEY.reducesinkkey0 (type: bigint)
           outputColumnNames: _col0
-          Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+          Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
           File Output Operator
             compressed: false
-            Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
             table:
                 input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                 output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -117,17 +117,17 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: hello_acid
-            Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
             Filter Operator
               predicate: (ROW__ID.transactionid = 3) (type: boolean)
-              Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
               Select Operator
                 expressions: ROW__ID.transactionid (type: bigint)
                 outputColumnNames: _col0
-                Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
                   table:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat


Mime
View raw message