hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject [2/2] hive git commit: HIVE-17458 VectorizedOrcAcidRowBatchReader doesn't handle 'original' files (Eugene Koifman, reviewed by Sergey Shelukhin)
Date Sat, 04 Nov 2017 17:14:46 GMT
HIVE-17458 VectorizedOrcAcidRowBatchReader doesn't handle 'original' files (Eugene Koifman, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1649c074
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1649c074
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1649c074

Branch: refs/heads/master
Commit: 1649c0741a8c065b4831e8495a2a01b919d8b6d0
Parents: 7006ade
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Sat Nov 4 10:14:04 2017 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Sat Nov 4 10:14:04 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  13 +-
 .../test/resources/testconfiguration.properties |   6 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |  33 +-
 .../apache/hadoop/hive/ql/udf/UDFRunWorker.java |  35 +
 .../hive/llap/io/api/impl/LlapRecordReader.java |   3 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  51 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |   2 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |  11 +-
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   2 +-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 352 +++++++--
 .../ql/io/orc/VectorizedOrcAcidRowReader.java   | 143 ----
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |   6 +-
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 118 +++
 .../TestVectorizedOrcAcidRowBatchReader.java    |  24 +-
 .../acid_vectorization_original.q               | 138 ++++
 .../acid_vectorization_original_tez.q           | 125 ++++
 .../llap/acid_vectorization_original.q.out      | 726 ++++++++++++++++++
 .../tez/acid_vectorization_original_tez.q.out   | 737 +++++++++++++++++++
 18 files changed, 2232 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cbe4de5..48341a8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1895,15 +1895,10 @@ public class HiveConf extends Configuration {
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
     HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
-        "Sets the operational properties that control the appropriate behavior for various\n"
-        + "versions of the Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
-        + "for future versions of ACID. (See HIVE-14035 for details.)\n"
-        + "0: Turn on the legacy mode for ACID\n"
-        + "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n"
-        + "2: Hash-based merge, which combines delta files using GRACE hash join based approach (not implemented)\n"
-        + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n"
-        + "This is intended to be used as an internal property for future versions of ACID. (See\n" +
-          "HIVE-14035 for details.)"),
+      "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" +
+      "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" +
+      "This is intended to be used as an internal property for future versions of ACID. (See\n" +
+        "HIVE-14035 for details.)"),
 
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
         "current open transactions reach this limit, future open transaction requests will be \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 462f332..9642697 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1,3 +1,5 @@
+# Note: the *.shared groups also run on TestCliDriver
+
 # NOTE: files should be listed in alphabetical order
 minimr.query.files=infer_bucket_sort_map_operators.q,\
   infer_bucket_sort_dyn_part.q,\
@@ -50,7 +52,8 @@ minitez.query.files.shared=delete_orig_table.q,\
 
 # NOTE: Add tests to minitez only if it is very
 # specific to tez and cannot be added to minillap.
-minitez.query.files=explainuser_3.q,\
+minitez.query.files=acid_vectorization_original_tez.q,\
+  explainuser_3.q,\
   explainanalyze_1.q,\
   explainanalyze_2.q,\
   explainanalyze_3.q,\
@@ -474,6 +477,7 @@ minillaplocal.query.files=\
   acid_no_buckets.q, \
   acid_globallimit.q,\
   acid_vectorization_missing_cols.q,\
+  acid_vectorization_original.q,\
   alter_merge_stats_orc.q,\
   authorization_view_8.q,\
   auto_join30.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 4477954..6c34c08 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -85,9 +85,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hive.cli.CliDriver;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -1561,6 +1559,16 @@ public class QTestUtil {
           }
         }
       }
+      else {
+        for (PatternReplacementPair prp : partialPlanMask) {
+          matcher = prp.pattern.matcher(line);
+          if (matcher.find()) {
+            line = line.replaceAll(prp.pattern.pattern(), prp.replacement);
+            partialMaskWasMatched = true;
+            break;
+          }
+        }
+      }
 
       if (!partialMaskWasMatched) {
         for (Pair<Pattern, String> pair : patternsWithMaskComments) {
@@ -1650,7 +1658,26 @@ public class QTestUtil {
       "data/warehouse/(.*?/)+\\.hive-staging"  // the directory might be db/table/partition
       //TODO: add more expected test result here
   });
-
+  /**
+   * Pattern to match and (partial) replacement text.
+   * For example, {"transaction":76,"bucketid":8249877}.  We just want to mask 76 but a regex that
+   * matches just 76 will match a lot of other things.
+   */
+  private final static class PatternReplacementPair {
+    private final Pattern pattern;
+    private final String replacement;
+    PatternReplacementPair(Pattern p, String r) {
+      pattern = p;
+      replacement = r;
+    }
+  }
+  private final PatternReplacementPair[] partialPlanMask;
+  {
+    ArrayList<PatternReplacementPair> ppm = new ArrayList<>();
+    ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"transactionid\":[1-9][0-9]*,\"bucketid\":"),
+      "{\"transactionid\":### Masked txnid ###,\"bucketid\":"));
+    partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]);
+  }
   /* This list may be modified by specific cli drivers to mask strings that change on every test */
   private final List<Pair<Pattern, String>> patternsWithMaskComments = new ArrayList<Pair<Pattern, String>>() {{
     add(toPatternPair("(pblob|s3.?|swift|wasb.?).*hive-staging.*","### BLOBSTORE_STAGING_PATH ###"));

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java
new file mode 100644
index 0000000..de6aca2
--- /dev/null
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf;
+
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A UDF for testing, which does key/value lookup from a file
+ */
+@Description(name = "runWorker",
+  value = "_FUNC_() - UDF launching Compaction Worker")
+public class UDFRunWorker extends UDF {
+  public void evaluate() throws Exception {
+    TestTxnCommands2.runWorker(SessionState.get().getConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index d66fac2..5f010be 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -277,7 +278,7 @@ class LlapRecordReader
         acidVrb.cols = cvb.cols;
         acidVrb.size = cvb.size;
         final VectorizedOrcAcidRowBatchReader acidReader =
-            new VectorizedOrcAcidRowBatchReader(split, jobConf, Reporter.NULL,
+            new VectorizedOrcAcidRowBatchReader((OrcSplit)split, jobConf, Reporter.NULL,
                 new RecordReader<NullWritable, VectorizedRowBatch>() {
                   @Override
                   public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index c364343..1e5b841 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1849,6 +1849,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
   getRecordReader(InputSplit inputSplit, JobConf conf,
                   Reporter reporter) throws IOException {
+    //CombineHiveInputFormat may produce FileSplit that is not OrcSplit
     boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
     boolean isAcidRead = isAcidRead(conf, inputSplit);
     if (!isAcidRead) {
@@ -1868,27 +1869,19 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
 
     reporter.setStatus(inputSplit.toString());
+    //if here we are now doing an Acid read so must have OrcSplit.  CombineHiveInputFormat is
+    // disabled for acid path
+    OrcSplit split = (OrcSplit) inputSplit;
 
-    boolean isFastVectorizedReaderAvailable =
-        VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit);
-
-    if (vectorMode && isFastVectorizedReaderAvailable) {
-      // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching.
+    if (vectorMode) {
       return (org.apache.hadoop.mapred.RecordReader)
-          new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter);
+          new VectorizedOrcAcidRowBatchReader(split, conf, reporter);
     }
 
     Options options = new Options(conf).reporter(reporter);
     final RowReader<OrcStruct> inner = getReader(inputSplit, options);
-    if (vectorMode && !isFastVectorizedReaderAvailable) {
-      // Vectorized regular ACID reader that does row-by-row stitching.
-      return (org.apache.hadoop.mapred.RecordReader)
-          new VectorizedOrcAcidRowReader(inner, conf,
-              Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
-    } else {
-      // Non-vectorized regular ACID reader.
-      return new NullKeyRecordReader(inner, conf);
-    }
+    // Non-vectorized regular ACID reader.
+    return new NullKeyRecordReader(inner, conf);
   }
 
   /**
@@ -1945,38 +1938,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                             throws IOException {
 
     final OrcSplit split = (OrcSplit) inputSplit;
-    final Path path = split.getPath();
-    Path root;
-    if (split.hasBase()) {
-      if (split.isOriginal()) {
-        root = split.getRootDir();
-      } else {
-        root = path.getParent().getParent();
-        assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() +
-          " path.p.p=" + root;
-      }
-    } else {
-      throw new IllegalStateException("Split w/o base: " + path);
-    }
 
     // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
     AcidUtils.AcidOperationalProperties acidOperationalProperties
             = AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+    if(!acidOperationalProperties.isSplitUpdate()) {
+      throw new IllegalStateException("Expected SpliUpdate table: " + split.getPath());
+    }
 
-    // The deltas are decided based on whether split-update has been turned on for the table or not.
-    // When split-update is turned off, everything in the delta_x_y/ directory should be treated
-    // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory
-    // need to be considered as delta, because files in delta_x_y/ will be processed as base files
-    // since they only have insert events in them.
-    final Path[] deltas =
-        acidOperationalProperties.isSplitUpdate() ?
-            AcidUtils.deserializeDeleteDeltas(root, split.getDeltas())
-            : AcidUtils.deserializeDeltas(root, split.getDeltas());
+    final Path[] deltas = VectorizedOrcAcidRowBatchReader.getDeleteDeltaDirsFromSplit(split);
     final Configuration conf = options.getConfiguration();
 
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
     OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
-    mergerOptions.rootPath(root);
+    mergerOptions.rootPath(split.getRootDir());
     final int bucket;
     if (split.hasBase()) {
       AcidOutputFormat.Options acidIOOptions =

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index eed6d22..95a60dc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -433,7 +433,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           AcidOutputFormat.Options bucketOptions =
             AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
           if (bucketOptions.getBucketId() != bucketId) {
-            continue;
+            continue;//todo: HIVE-16952
           }
           if (haveSeenCurrentFile) {
             //if here we already saw current file and now found another file for the same bucket

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 1e19a91..315cc1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -62,21 +62,24 @@ public class OrcRecordUpdater implements RecordUpdater {
 
   private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class);
 
-  public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index";
-  public static final String ACID_FORMAT = "_orc_acid_version";
-  public static final int ORC_ACID_VERSION = 0;
+  static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index";
+  private static final String ACID_FORMAT = "_orc_acid_version";
+  private static final int ORC_ACID_VERSION = 0;
 
 
   final static int INSERT_OPERATION = 0;
   final static int UPDATE_OPERATION = 1;
   final static int DELETE_OPERATION = 2;
-
+  //column indexes of corresponding data in storage layer
   final static int OPERATION = 0;
   final static int ORIGINAL_TRANSACTION = 1;
   final static int BUCKET = 2;
   final static int ROW_ID = 3;
   final static int CURRENT_TRANSACTION = 4;
   final static int ROW = 5;
+  /**
+   * total number of fields (above)
+   */
   final static int FIELDS = 6;
 
   final static int DELTA_BUFFER_SIZE = 16 * 1024;

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 260a5ac..58638b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -29,7 +29,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.ColumnarSplit;
@@ -238,6 +237,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     final AcidUtils.AcidOperationalProperties acidOperationalProperties
         = AcidUtils.getAcidOperationalProperties(conf);
     final boolean isSplitUpdate = acidOperationalProperties.isSplitUpdate();
+    assert isSplitUpdate : "should be true in Hive 3.0";
 
     if (isOriginal) {
       if (!isAcidRead && !hasDelta) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 1e16f09..bcde4fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -37,9 +38,12 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -51,10 +55,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 /**
- * A fast vectorized batch reader class for ACID when split-update behavior is enabled.
- * When split-update is turned on, row-by-row stitching could be avoided to create the final
- * version of a row. Essentially, there are only insert and delete events. Insert events can be
- * directly read from the base files/insert_only deltas in vectorized row batches. The deleted
+ * A fast vectorized batch reader class for ACID. Insert events are read directly
+ * from the base files/insert_only deltas in vectorized row batches. The deleted
  * rows can then be easily indicated via the 'selected' field of the vectorized row batch.
  * Refer HIVE-14233 for more details.
  */
@@ -63,26 +65,51 @@ public class VectorizedOrcAcidRowBatchReader
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class);
 
-  public org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader;
-  protected VectorizedRowBatchCtx rbCtx;
-  protected VectorizedRowBatch vectorizedRowBatchBase;
+  private org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader;
+  private final VectorizedRowBatchCtx rbCtx;
+  private VectorizedRowBatch vectorizedRowBatchBase;
   private long offset;
   private long length;
   protected float progress = 0.0f;
   protected Object[] partitionValues;
-  protected boolean addPartitionCols = true;
-  private ValidTxnList validTxnList;
-  protected DeleteEventRegistry deleteEventRegistry;
-  protected StructColumnVector recordIdColumnVector;
-  private org.apache.orc.Reader.Options readerOptions;
+  private boolean addPartitionCols = true;
+  private final ValidTxnList validTxnList;
+  private final DeleteEventRegistry deleteEventRegistry;
+  /**
+   * {@link RecordIdentifier}/{@link VirtualColumn#ROWID} information
+   */
+  private final StructColumnVector recordIdColumnVector;
+  private final Reader.Options readerOptions;
+  private final boolean isOriginal;
+  /**
+   * something further in the data pipeline wants {@link VirtualColumn#ROWID}
+   */
+  private final boolean rowIdProjected;
+  /**
+   * partition/table root
+   */
+  private final Path rootPath;
+  /**
+   * for reading "original" files
+   */
+  private final OffsetAndBucketProperty syntheticProps;
+  /**
+   * To have access to {@link RecordReader#getRowNumber()} in the underlying file
+   */
+  private RecordReader innerReader;
 
-  public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf,
-        Reporter reporter) throws IOException {
-    this.init(inputSplit, conf, reporter, Utilities.getVectorizedRowBatchCtx(conf));
+  VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
+                                  Reporter reporter) throws IOException {
+    this(inputSplit, conf,reporter, null);
+  }
+  @VisibleForTesting
+  VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
+        Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException {
+    this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx);
 
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit);
     // Careful with the range here now, we do not want to read the whole base file like deltas.
-    final RecordReader innerReader = reader.rowsOptions(readerOptions.range(offset, length));
+    innerReader = reader.rowsOptions(readerOptions.range(offset, length));
     baseReader = new org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch>() {
 
       @Override
@@ -117,16 +144,19 @@ public class VectorizedOrcAcidRowBatchReader
     };
     this.vectorizedRowBatchBase = ((RecordReaderImpl) innerReader).createRowBatch();
   }
-
-  public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter,
+  /**
+   * LLAP IO c'tor
+   */
+  public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter,
       org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader,
       VectorizedRowBatchCtx rbCtx) throws IOException {
-    this.init(inputSplit, conf, reporter, rbCtx);
+    this(conf, inputSplit, reporter, rbCtx);
     this.baseReader = baseReader;
+    this.innerReader = null;
     this.vectorizedRowBatchBase = baseReader.createValue();
   }
 
-  private void init(InputSplit inputSplit, JobConf conf, Reporter reporter,
+  private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter,
       VectorizedRowBatchCtx rowBatchCtx) throws IOException {
     this.rbCtx = rowBatchCtx;
     final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
@@ -143,8 +173,7 @@ public class VectorizedOrcAcidRowBatchReader
     final OrcSplit orcSplit = (OrcSplit) inputSplit;
 
     reporter.setStatus(orcSplit.toString());
-    readerOptions = OrcInputFormat.createOptionsForReader(conf);
-    readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions);
+    readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf));
 
     this.offset = orcSplit.getStart();
     this.length = orcSplit.getLength();
@@ -167,55 +196,161 @@ public class VectorizedOrcAcidRowBatchReader
     deleteEventReaderOptions.range(0, Long.MAX_VALUE);
     //  Disable SARGs for deleteEventReaders, as SARGs have no meaning.
     deleteEventReaderOptions.searchArgument(null, null);
+    DeleteEventRegistry der;
     try {
       // See if we can load all the delete events from all the delete deltas in memory...
-      this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+      der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
     } catch (DeleteEventsOverflowMemoryException e) {
       // If not, then create a set of hanging readers that do sort-merge to find the next smallest
       // delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
-      this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+      der = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
     }
-
-    recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null);
+    this.deleteEventRegistry = der;
+    isOriginal = orcSplit.isOriginal();
+    if(isOriginal) {
+      recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+        new LongColumnVector(), new LongColumnVector(), new LongColumnVector());
+    }
+    else {
+      //will swap in the Vectors from underlying row batch
+      recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null);
+    }
+    rowIdProjected = areRowIdsProjected(rbCtx);
+    rootPath = orcSplit.getRootDir();
+    syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList);
   }
 
   /**
-   * Returns whether it is possible to create a valid instance of this class for a given split.
-   * @param conf is the job configuration
-   * @param inputSplit
-   * @return true if it is possible, else false.
+   * Used for generating synthetic ROW__IDs for reading "original" files
+   */
+  private static final class OffsetAndBucketProperty {
+    private final long rowIdOffset;
+    private final int bucketProperty;
+    private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) {
+      this.rowIdOffset = rowIdOffset;
+      this.bucketProperty = bucketProperty;
+    }
+  }
+  /**
+   * See {@link #next(NullWritable, VectorizedRowBatch)} fist and
+   * {@link OrcRawRecordMerger.OriginalReaderPair}.
+   * When reading a split of an "original" file and we need to decorate data with ROW__ID.
+   * This requires treating multiple files that are part of the same bucket (tranche for unbucketed
+   * tables) as a single logical file to number rowids consistently.
+   *
+   * todo: This logic is executed per split of every "original" file.  The computed result is the
+   * same for every split form the same file so this could be optimized by moving it to
+   * before/during splt computation and passing the info in the split.  (HIVE-17917)
+   */
+  private OffsetAndBucketProperty computeOffsetAndBucket(
+    OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
+    if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+      return new OffsetAndBucketProperty(0,0);
+    }
+    long rowIdOffset = 0;
+    int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
+    int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId));
+    AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf,
+      validTxnList, false, true);
+    for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+      AcidOutputFormat.Options bucketOptions =
+        AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+      if (bucketOptions.getBucketId() != bucketId) {
+        continue;//HIVE-16952
+      }
+      if (f.getFileStatus().getPath().equals(split.getPath())) {
+        //'f' is the file whence this split is
+        break;
+      }
+      Reader reader = OrcFile.createReader(f.getFileStatus().getPath(),
+        OrcFile.readerOptions(conf));
+      rowIdOffset += reader.getNumberOfRows();
+    }
+    return new OffsetAndBucketProperty(rowIdOffset, bucketProperty);
+  }
+  /**
+   * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
+   * In some cases this cannot be used from LLAP IO elevator because
+   * {@link RecordReader#getRowNumber()} is not (currently) available there but is required to
+   * generate ROW__IDs for "original" files
+   * @param hasDeletes - if there are any deletes that apply to this split
+   * todo: HIVE-17944
    */
-  public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) {
-    if (!(inputSplit instanceof OrcSplit)) {
-      return false; // must be an instance of OrcSplit.
-    }
-    // First check if we are reading any original files in the split.
-    // To simplify the vectorization logic, the vectorized acid row batch reader does not handle
-    // original files for now as they have a different schema than a regular ACID file.
-    final OrcSplit split = (OrcSplit) inputSplit;
-    if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) {
-      // When split-update is turned on for ACID, a more optimized vectorized batch reader
-      // can be created. But still only possible when we are *NOT* reading any originals.
+  static boolean canUseLlapForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) {
+    if(!split.isOriginal()) {
       return true;
     }
-    return false; // no split-update or possibly reading originals!
+    VectorizedRowBatchCtx rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+    if(rbCtx == null) {
+      throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath());
+    }
+    return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx));
   }
 
-  private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
+  /**
+   * Does this reader need to decorate rows with ROW__IDs (for "original" reads).
+   * Even if ROW__ID is not projected you still need to decorate the rows with them to see if
+   * any of the delete events apply.
+   */
+  private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) {
+    return split.isOriginal() && (hasDeletes || rowIdProjected);
+  }
+  private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) {
+    if(rbCtx.getVirtualColumnCount() == 0) {
+      return false;
+    }
+    for(VirtualColumn vc : rbCtx.getNeededVirtualColumns()) {
+      if(vc == VirtualColumn.ROWID) {
+        //The query needs ROW__ID: maybe explicitly asked, maybe it's part of
+        // Update/Delete statement.
+        //Either way, we need to decorate "original" rows with row__id
+        return true;
+      }
+    }
+    return false;
+  }
+  static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
     Path path = orcSplit.getPath();
     Path root;
     if (orcSplit.hasBase()) {
       if (orcSplit.isOriginal()) {
-        root = path.getParent();
+        root = orcSplit.getRootDir();
       } else {
         root = path.getParent().getParent();
+        assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() +
+          " path.p.p=" + root;
       }
     } else {
-      root = path;
+      throw new IllegalStateException("Split w/o base w/Acid 2.0??: " + path);
     }
     return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas());
   }
 
+  /**
+   * There are 2 types of schema from the {@link #baseReader} that this handles.  In the case
+   * the data was written to a transactional table from the start, every row is decorated with
+   * transaction related info and looks like <op, otid, writerId, rowid, ctid, <f1, ... fn>>.
+   *
+   * The other case is when data was written to non-transactional table and thus only has the user
+   * data: <f1, ... fn>.  Then this table was then converted to a transactional table but the data
+   * files are not changed until major compaction.  These are the "original" files.
+   *
+   * In this case we may need to decorate the outgoing data with transactional column values at
+   * read time.  (It's done somewhat out of band via VectorizedRowBatchCtx - ask Teddy Choi).
+   * The "otid, writerId, rowid" columns represent {@link RecordIdentifier}.  They are assigned
+   * each time the table is read in a way that needs to project {@link VirtualColumn#ROWID}.
+   * Major compaction will attach these values to each row permanently.
+   * It's critical that these generated column values are assigned exactly the same way by each
+   * read of the same row and by the Compactor.
+   * See {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorMR} and
+   * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} for the Compactor read path.
+   * (Longer term should make compactor use this class)
+   *
+   * This only decorates original rows with metadata if something above is requesting these values
+   * or if there are Delete events to apply.
+   *
+   * @return false where there is no more data, i.e. {@code value} is empty
+   */
   @Override
   public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
     try {
@@ -257,12 +392,60 @@ public class VectorizedOrcAcidRowBatchReader
       // When selectedInUse is set to false, everything in the batch is selected.
       selectedBitSet.set(0, vectorizedRowBatchBase.size, true);
     }
-
-    // Case 1- find rows which belong to transactions that are not valid.
-    findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+    ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols;
+    if(isOriginal) {
+      /*
+       * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order
+       * to see if any deletes apply
+       */
+      if(rowIdProjected || !deleteEventRegistry.isEmpty()) {
+        if(innerReader == null) {
+          throw new IllegalStateException(getClass().getName() + " requires " +
+            org.apache.orc.RecordReader.class +
+            " to handle original files that require ROW__IDs: " + rootPath);
+        }
+        /**
+         * {@link RecordIdentifier#getTransactionId()}
+         */
+        recordIdColumnVector.fields[0].noNulls = true;
+        recordIdColumnVector.fields[0].isRepeating = true;
+        //all "original" is considered written by txnid:0 which committed
+        ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0;
+        /**
+         * This is {@link RecordIdentifier#getBucketProperty()}
+         * Also see {@link BucketCodec}
+         */
+        recordIdColumnVector.fields[1].noNulls = true;
+        recordIdColumnVector.fields[1].isRepeating = true;
+        ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty;
+        /**
+         * {@link RecordIdentifier#getRowId()}
+         */
+        recordIdColumnVector.fields[2].noNulls = true;
+        recordIdColumnVector.fields[2].isRepeating = false;
+        long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector;
+        for(int i = 0; i < vectorizedRowBatchBase.size; i++) {
+          //baseReader.getRowNumber() seems to point at the start of the batch todo: validate
+          rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i;
+        }
+        //Now populate a structure to use to apply delete events
+        innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS];
+        innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
+        innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
+        innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
+      }
+    }
+    else {
+      // Case 1- find rows which belong to transactions that are not valid.
+      findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+      /**
+       * All "original" data belongs to txnid:0 and is always valid/committed for every reader
+       * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal
+       */
+    }
 
     // Case 2- find rows which have been deleted.
-    this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase.cols,
+    this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector,
         vectorizedRowBatchBase.size, selectedBitSet);
 
     if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) {
@@ -283,30 +466,39 @@ public class VectorizedOrcAcidRowBatchReader
       }
     }
 
-    // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
-    // NOTE: We only link up the user columns and not the ACID metadata columns because this
-    // vectorized code path is not being used in cases of update/delete, when the metadata columns
-    // would be expected to be passed up the operator pipeline. This is because
-    // currently the update/delete specifically disable vectorized code paths.
-    // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
-    StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
-    // Transfer columnVector objects from base batch to outgoing batch.
-    System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
-    if (rbCtx != null) {
-      recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION];
-      recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
-      recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
+    if(isOriginal) {
+     /*Just copy the payload.  {@link recordIdColumnVector} has already been populated*/
+      System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0,
+        value.getDataColumnCount());
+    }
+    else {
+      // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
+      // NOTE: We only link up the user columns and not the ACID metadata columns because this
+      // vectorized code path is not being used in cases of update/delete, when the metadata columns
+      // would be expected to be passed up the operator pipeline. This is because
+      // currently the update/delete specifically disable vectorized code paths.
+      // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
+      StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
+      // Transfer columnVector objects from base batch to outgoing batch.
+      System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
+      if(rowIdProjected) {
+        recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION];
+        recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
+        recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
+      }
+    }
+    if(rowIdProjected) {
       rbCtx.setRecordIdColumnVector(recordIdColumnVector);
     }
     progress = baseReader.getProgress();
     return true;
   }
 
-  protected void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
+  private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
     findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet);
   }
 
-  protected void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) {
+  private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) {
     if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
       // When we have repeating values, we can unset the whole bitset at once
       // if the repeating value is not a valid transaction.
@@ -387,6 +579,11 @@ public class VectorizedOrcAcidRowBatchReader
      * @throws IOException
      */
     public void close() throws IOException;
+
+    /**
+     * @return {@code true} if no delete events were found
+     */
+    boolean isEmpty();
   }
 
   /**
@@ -400,10 +597,10 @@ public class VectorizedOrcAcidRowBatchReader
     private OrcRawRecordMerger deleteRecords;
     private OrcRawRecordMerger.ReaderKey deleteRecordKey;
     private OrcStruct deleteRecordValue;
-    private boolean isDeleteRecordAvailable = true;
+    private Boolean isDeleteRecordAvailable = null;
     private ValidTxnList validTxnList;
 
-    public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
+    SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
       throws IOException {
         final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
         if (deleteDeltas.length > 0) {
@@ -428,6 +625,13 @@ public class VectorizedOrcAcidRowBatchReader
     }
 
     @Override
+    public boolean isEmpty() {
+      if(isDeleteRecordAvailable == null) {
+        throw new IllegalStateException("Not yet initialized");
+      }
+      return !isDeleteRecordAvailable;
+    }
+    @Override
     public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet)
         throws IOException {
       if (!isDeleteRecordAvailable) {
@@ -546,7 +750,7 @@ public class VectorizedOrcAcidRowBatchReader
        */
       private int bucketProperty; 
       private long rowId;
-      public DeleteRecordKey() {
+      DeleteRecordKey() {
         this.originalTransactionId = -1;
         this.rowId = -1;
       }
@@ -596,7 +800,7 @@ public class VectorizedOrcAcidRowBatchReader
       private boolean isBucketPropertyRepeating;
       private final boolean isBucketedTable;
 
-      public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
+      DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
           ValidTxnList validTxnList, boolean isBucketedTable) throws IOException {
         this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
         this.bucketForSplit = bucket;
@@ -741,8 +945,9 @@ public class VectorizedOrcAcidRowBatchReader
     private long rowIds[];
     private CompressedOtid compressedOtids[];
     private ValidTxnList validTxnList;
+    private Boolean isEmpty = null;
 
-    public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
+    ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
         Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
       int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
       String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
@@ -804,6 +1009,7 @@ public class VectorizedOrcAcidRowBatchReader
             readAllDeleteEventsFromDeleteDeltas();
           }
         }
+        isEmpty = compressedOtids == null || rowIds == null;
       } catch(IOException|DeleteEventsOverflowMemoryException e) {
         close(); // close any open readers, if there was some exception during initialization.
         throw e; // rethrow the exception so that the caller can handle.
@@ -910,7 +1116,13 @@ public class VectorizedOrcAcidRowBatchReader
       }
       return false;
     }
-
+    @Override
+    public boolean isEmpty() {
+      if(isEmpty == null) {
+        throw new IllegalStateException("Not yet initialized");
+      }
+      return isEmpty;
+    }
     @Override
     public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet)
         throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
deleted file mode 100644
index 885ef83..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-
-/**
- * Implement a RecordReader that stitches together base and delta files to
- * support tables and partitions stored in the ACID format. It works by using
- * the non-vectorized ACID reader and moving the data into a vectorized row
- * batch.
- */
-public class VectorizedOrcAcidRowReader
-    implements org.apache.hadoop.mapred.RecordReader<NullWritable,
-                                                     VectorizedRowBatch> {
-  private final AcidInputFormat.RowReader<OrcStruct> innerReader;
-  private final RecordIdentifier key;
-  private final OrcStruct value;
-  private VectorizedRowBatchCtx rbCtx;
-  private Object[] partitionValues;
-  private final ObjectInspector objectInspector;
-  private final DataOutputBuffer buffer = new DataOutputBuffer();
-  private final StructColumnVector recordIdColumnVector;
-  private final LongColumnVector transactionColumnVector;
-  private final LongColumnVector bucketColumnVector;
-  private final LongColumnVector rowIdColumnVector;
-
-  public VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
-      Configuration conf, VectorizedRowBatchCtx vectorizedRowBatchCtx, FileSplit split)
-      throws IOException {
-    this.innerReader = inner;
-    this.key = inner.createKey();
-    rbCtx = vectorizedRowBatchCtx;
-    int partitionColumnCount = rbCtx.getPartitionColumnCount();
-    if (partitionColumnCount > 0) {
-      partitionValues = new Object[partitionColumnCount];
-      rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
-    }
-    this.value = inner.createValue();
-    this.objectInspector = inner.getObjectInspector();
-    this.transactionColumnVector = new LongColumnVector();
-    this.bucketColumnVector = new LongColumnVector();
-    this.rowIdColumnVector = new LongColumnVector();
-    this.recordIdColumnVector =
-        new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-            transactionColumnVector, bucketColumnVector, rowIdColumnVector);
-  }
-
-  @Override
-  public boolean next(NullWritable nullWritable,
-                      VectorizedRowBatch vectorizedRowBatch
-                      ) throws IOException {
-    vectorizedRowBatch.reset();
-    buffer.reset();
-    if (!innerReader.next(key, value)) {
-      return false;
-    }
-    if (partitionValues != null) {
-      rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues);
-    }
-    try {
-      VectorizedBatchUtil.acidAddRowToBatch(value,
-          (StructObjectInspector) objectInspector,
-          vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer);
-      addRecordId(vectorizedRowBatch.size, key);
-      vectorizedRowBatch.size++;
-      while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
-          innerReader.next(key, value)) {
-        VectorizedBatchUtil.acidAddRowToBatch(value,
-            (StructObjectInspector) objectInspector,
-            vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer);
-        addRecordId(vectorizedRowBatch.size, key);
-        vectorizedRowBatch.size++;
-      }
-      rbCtx.setRecordIdColumnVector(recordIdColumnVector);
-    } catch (Exception e) {
-      throw new IOException("error iterating", e);
-    }
-    return true;
-  }
-
-  private void addRecordId(int index, RecordIdentifier key) {
-    transactionColumnVector.vector[index] = key.getTransactionId();
-    bucketColumnVector.vector[index] = key.getBucketProperty();
-    rowIdColumnVector.vector[index] = key.getRowId();
-  }
-
-  @Override
-  public NullWritable createKey() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public VectorizedRowBatch createValue() {
-    return rbCtx.createVectorizedRowBatch();
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return innerReader.getPos();
-  }
-
-  @Override
-  public void close() throws IOException {
-    innerReader.close();
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    return innerReader.getProgress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 662462c..2c76f79 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -56,9 +56,9 @@ import java.util.concurrent.TimeUnit;
  * test AC=true, and AC=false with commit/rollback/exception and test resulting data.
  *
  * Can also test, calling commit in AC=true mode, etc, toggling AC...
- * 
- * Tests here are for multi-statement transactions (WIP) and those that don't need to
- * run with Acid 2.0 (see subclasses of TestTxnCommands2)
+ *
+ * Tests here are for multi-statement transactions (WIP) and others
+ * Mostly uses bucketed tables
  */
 public class TestTxnCommands extends TxnCommandsBaseForTests {
   static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index c827dc4..f0d9ff2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hive.ql;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -522,5 +526,119 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
 //    Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode());
     Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support"));
   }
+  /**
+   * Tests to check that we are able to use vectorized acid reader,
+   * VectorizedOrcAcidRowBatchReader, when reading "original" files,
+   * i.e. those that were written before the table was converted to acid.
+   * See also acid_vectorization_original*.q
+   */
+  @Test
+  public void testNonAcidToAcidVectorzied() throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    //this enables vectorization of ROW__ID
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T(a int, b int) stored as orc");
+    int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
+    runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values));
+    //, 'transactional_properties'='default'
+    runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')");
+    //Execution mode: vectorized
+      //this uses VectorizedOrcAcidRowBatchReader
+    String query = "select a from T where b > 6 order by a";
+    List<String> rs = runStatementOnDriver(query);
+    String[][] expected = {
+      {"6", ""},
+      {"9", ""},
+    };
+    checkExpected(rs, expected, "After conversion");
+    Assert.assertEquals(Integer.toString(6), rs.get(0));
+    Assert.assertEquals(Integer.toString(9), rs.get(1));
+    assertVectorized(true, query);
+
+    //why isn't PPD working.... - it is working but storage layer doesn't do row level filtering; only row group level
+    //this uses VectorizedOrcAcidRowBatchReader
+    query = "select ROW__ID, a from T where b > 6 order by a";
+    rs = runStatementOnDriver(query);
+    String[][] expected1 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"}
+    };
+    checkExpected(rs, expected1, "After conversion with VC1");
+    assertVectorized(true, query);
+
+    //this uses VectorizedOrcAcidRowBatchReader
+    query = "select ROW__ID, a from T where b > 0 order by a";
+    rs = runStatementOnDriver(query);
+    String[][] expected2 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}", "1"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}", "2"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}", "5"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"}
+    };
+    checkExpected(rs, expected2, "After conversion with VC2");
+    assertVectorized(true, query);
+
+    //doesn't vectorize (uses neither of the Vectorzied Acid readers)
+    query = "select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a";
+    rs = runStatementOnDriver(query);
+    Assert.assertEquals("", 2, rs.size());
+    String[][] expected3 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"}
+    };
+    checkExpected(rs, expected3, "After non-vectorized read");
+    Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    //vectorized because there is INPUT__FILE__NAME
+    assertVectorized(false, query);
+
+    runStatementOnDriver("update T set b = 17 where a = 1");
+    //this should use VectorizedOrcAcidRowReader
+    query = "select ROW__ID, b from T where b > 0 order by a";
+    rs = runStatementOnDriver(query);
+    String[][] expected4 = {
+      {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}","17"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}","4"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}","6"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}","8"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}","10"}
+    };
+    checkExpected(rs, expected4, "After conversion with VC4");
+    assertVectorized(true, query);
+
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
+    Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+    Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+
+    //this should not vectorize at all
+    query = "select ROW__ID, a, b, INPUT__FILE__NAME from T where b > 0 order by a, b";
+    rs = runStatementOnDriver(query);
+    String[][] expected5 = {//the row__ids are the same after compaction
+      {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "warehouse/t/base_0000025/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4",   "warehouse/t/base_0000025/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6",   "warehouse/t/base_0000025/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8",   "warehouse/t/base_0000025/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10",  "warehouse/t/base_0000025/bucket_00000"}
+    };
+    checkExpected(rs, expected5, "After major compaction");
+    //vectorized because there is INPUT__FILE__NAME
+    assertVectorized(false, query);
+  }
+  private void assertVectorized(boolean vectorized, String query) throws Exception {
+    List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
+    for(String line : rs) {
+      if(line != null && line.contains("Execution mode: vectorized")) {
+        Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
+        return;
+      }
+    }
+    Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 43e0a4a..b2ac687 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
@@ -100,10 +101,11 @@ public class TestVectorizedOrcAcidRowBatchReader {
     conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
 
-    fs = FileSystem.getLocal(conf);
     Path workDir = new Path(System.getProperty("test.tmp.dir",
         "target" + File.separator + "test" + File.separator + "tmp"));
     root = new Path(workDir, "TestVectorizedOrcAcidRowBatch.testDump");
+    fs = root.getFileSystem(conf);
+    root = fs.makeQualified(root);
     fs.delete(root, true);
     ObjectInspector inspector;
     synchronized (TestOrcFile.class) {
@@ -189,7 +191,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
     assertEquals(1, splitStrategies.size());
     List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
     assertEquals(1, splits.size());
-    assertEquals("file://" + root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
+    assertEquals(root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
         splits.get(0).getPath().toUri().toString());
     assertFalse(splits.get(0).isOriginal());
     return splits;
@@ -216,7 +218,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
     // are being handled properly.
     conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5
 
-    VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL);
+    VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, new VectorizedRowBatchCtx());
     if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) {
       assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry);
     }
@@ -242,20 +244,4 @@ public class TestVectorizedOrcAcidRowBatchReader {
       }
     }
   }
-
-  @Test
-  public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
-    OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
-
-    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
-        AcidUtils.AcidOperationalProperties.getDefault().toInt());
-    Mockito.when(mockSplit.isOriginal()).thenReturn(true);
-    // Test false when trying to create a vectorized ACID row batch reader when reading originals.
-    assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
-
-    // A positive test case.
-    Mockito.when(mockSplit.isOriginal()).thenReturn(false);
-    assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/queries/clientpositive/acid_vectorization_original.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original.q b/ql/src/test/queries/clientpositive/acid_vectorization_original.q
new file mode 100644
index 0000000..ddf138d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_vectorization_original.q
@@ -0,0 +1,138 @@
+set hive.mapred.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+-- enables vectorizaiton of VirtualColumn.ROWID
+set hive.vectorized.row.identifier.enabled=true;
+-- enable ppd
+set hive.optimize.index.filter=true;
+
+set hive.explain.user=false;
+
+-- needed for TestCliDriver but not TestMiniTezCliDriver
+-- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+-- attempts to get compaction to run - see below
+-- set yarn.scheduler.maximum-allocation-mb=2024;
+-- set hive.tez.container.size=500;
+-- set mapreduce.map.memory.mb=500;
+-- set mapreduce.reduce.memory.mb=500;
+-- set mapred.job.map.memory.mb=500;
+-- set mapred.job.reduce.memory.mb=500;
+
+
+
+CREATE TEMPORARY FUNCTION runWorker AS 'org.apache.hadoop.hive.ql.udf.UDFRunWorker';
+create table mydual(a int);
+insert into mydual values(1);
+
+CREATE TABLE over10k(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           `dec` decimal(4,2),
+           bin binary)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
+STORED AS TEXTFILE;
+
+--oddly this has 9999 rows not > 10K
+LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE over10k;
+
+CREATE TABLE over10k_orc_bucketed(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           `dec` decimal(4,2),
+           bin binary) CLUSTERED BY(si) INTO 4 BUCKETS STORED AS ORC;
+
+-- this produces about 250 distinct values across all 4 equivalence classes
+select distinct si, si%4 from over10k order by si;
+
+-- explain insert into over10k_orc_bucketed select * from over10k cluster by si;
+-- w/o "cluster by" all data is written to 000000_0
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+-- create copy_N files
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+-- this output of this is masked in .out - it is visible in .orig
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+
+--this actually shows the data files in the .out on Tez but not LLAP
+select distinct 7 as seven, INPUT__FILE__NAME from over10k_orc_bucketed;
+
+-- convert table to acid
+alter table over10k_orc_bucketed set TBLPROPERTIES ('transactional'='true');
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+--             Execution mode: vectorized, llap
+--             LLAP IO: may be used (ACID table)
+explain select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i;
+select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by  t, si, i;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+--             Execution mode: vectorized, llap
+--             LLAP IO: may be used (ACID table)
+explain select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+-- HIVE-17943
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+--            Execution mode: vectorized, llap
+--            LLAP IO: may be used (ACID table)
+explain update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+
+-- this should produce the same result (data) as previous time this exact query ran
+-- ROW__ID will be different (same bucketProperty)
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- The idea below was to do check sum queries to ensure that ROW__IDs are unique
+-- to run Compaction and to check that ROW__IDs are the same before and after compaction (for rows
+-- w/o applicable delete events)
+-- Everything below is commented out because it doesn't work
+-- group by ROW__ID produces wrong results
+-- compactor doesn't run - see error below...
+
+-- this doesn't vectorize but use llap which is perhaps the problem if it's using cache
+-- use explain VECTORIZATION DETAIL to see
+-- notVectorizedReason: Key expression for GROUPBY operator: Vectorizing complex type STRUCT not supported
+explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+-- on LLAP this produces "NULL, 6"; on tez it produces nothing: HIVE-17921
+select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+-- this produces nothing (as it should)
+select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null;
+
+-- schedule compactor
+-- alter table over10k_orc_bucketed compact 'major' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="500","compactor.hive.tez.container.size"="500");;
+
+
+-- run compactor - this currently fails with
+-- Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512
+-- HIVE-17922
+-- select runWorker() from mydual;
+
+-- show compactions;
+
+-- this should produce the same result (data+ ROW__ID) as previous time this exact query ran
+-- select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+-- select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q b/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q
new file mode 100644
index 0000000..4d93662
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q
@@ -0,0 +1,125 @@
+set hive.mapred.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+-- enables vectorizaiton of VirtualColumn.ROWID
+set hive.vectorized.row.identifier.enabled=true;
+-- enable ppd
+set hive.optimize.index.filter=true;
+
+set hive.explain.user=false;
+
+-- needed for TestCliDriver but not TestMiniTezCliDriver
+-- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+-- attempts to get compaction to run - see below
+-- set yarn.scheduler.maximum-allocation-mb=2024;
+-- set hive.tez.container.size=500;
+-- set mapreduce.map.memory.mb=500;
+-- set mapreduce.reduce.memory.mb=500;
+-- set mapred.job.map.memory.mb=500;
+-- set mapred.job.reduce.memory.mb=500;
+
+
+
+CREATE TEMPORARY FUNCTION runWorker AS 'org.apache.hadoop.hive.ql.udf.UDFRunWorker';
+create table mydual(a int);
+insert into mydual values(1);
+
+CREATE TABLE over10k(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           `dec` decimal(4,2),
+           bin binary)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
+STORED AS TEXTFILE;
+
+--oddly this has 9999 rows not > 10K
+LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE over10k;
+
+CREATE TABLE over10k_orc_bucketed(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           `dec` decimal(4,2),
+           bin binary) CLUSTERED BY(si) INTO 4 BUCKETS STORED AS ORC;
+
+-- this produces about 250 distinct values across all 4 equivalence classes
+select distinct si, si%4 from over10k order by si;
+
+-- explain insert into over10k_orc_bucketed select * from over10k cluster by si;
+-- w/o "cluster by" all data is written to 000000_0
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+-- create copy_N files
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+-- this output of this is masked in .out - it is visible in .orig
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+
+--this actually shows the data files in the .out on Tez but not LLAP
+select distinct 7 as seven, INPUT__FILE__NAME from over10k_orc_bucketed;
+
+-- convert table to acid
+alter table over10k_orc_bucketed set TBLPROPERTIES ('transactional'='true');
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+--             Execution mode: vectorized (both Map and Reducer)
+explain select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i;
+select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by  t, si, i;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+--            Execution mode: vectorized
+explain select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- same as above
+explain update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+
+-- this should produce the same result (data) as previous time this exact query ran
+-- ROW__ID will be different (same bucketProperty)
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- The idea below was to do check sum queries to ensure that ROW__IDs are unique
+-- to run Compaction and to check that ROW__IDs are the same before and after compaction (for rows
+-- w/o applicable delete events)
+
+-- this doesn't vectorize
+-- use explain VECTORIZATION DETAIL to see
+-- notVectorizedReason: Key expression for GROUPBY operator: Vectorizing complex type STRUCT not supported
+explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- schedule compactor
+alter table over10k_orc_bucketed compact 'major' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="500","compactor.hive.tez.container.size"="500");;
+
+
+--  run compactor - this currently fails with
+-- Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512
+-- select runWorker() from mydual;
+
+-- show compactions;
+
+-- this should produce the same (data + ROW__ID) as before compaction
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;


Mime
View raw message