hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject [8/8] hive git commit: HIVE-10565: Native Vector Map Join doesn't handle filtering and matching on LEFT OUTER JOIN repeated key correctly (Matt McCline via Gunther Hagleitner)
Date Fri, 15 May 2015 21:39:04 GMT
HIVE-10565: Native Vector Map Join doesn't handle filtering and matching on LEFT OUTER JOIN repeated key correctly (Matt McCline via Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d421201c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d421201c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d421201c

Branch: refs/heads/branch-1.2
Commit: d421201cc9b495e6a7f266d42ff7cfb93483b957
Parents: 878a282
Author: Gunther Hagleitner <gunther@apache.org>
Authored: Thu May 14 15:42:04 2015 -0700
Committer: Gunther Hagleitner <gunther@apache.org>
Committed: Fri May 15 14:35:35 2015 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   10 +
 .../ql/exec/vector/VectorizedBatchUtil.java     |    5 +-
 .../mapjoin/VectorMapJoinCommonOperator.java    |    8 +-
 .../VectorMapJoinGenerateResultOperator.java    |   47 +-
 ...pJoinInnerBigOnlyGenerateResultOperator.java |   53 +-
 .../VectorMapJoinInnerBigOnlyLongOperator.java  |   15 +-
 ...ctorMapJoinInnerBigOnlyMultiKeyOperator.java |   15 +-
 ...VectorMapJoinInnerBigOnlyStringOperator.java |   12 +-
 ...ectorMapJoinInnerGenerateResultOperator.java |   39 +-
 .../mapjoin/VectorMapJoinInnerLongOperator.java |   17 +-
 .../VectorMapJoinInnerMultiKeyOperator.java     |   19 +-
 .../VectorMapJoinInnerStringOperator.java       |   17 +-
 ...orMapJoinLeftSemiGenerateResultOperator.java |   40 +-
 .../VectorMapJoinLeftSemiLongOperator.java      |   13 +-
 .../VectorMapJoinLeftSemiMultiKeyOperator.java  |   17 +-
 .../VectorMapJoinLeftSemiStringOperator.java    |   17 +-
 ...ectorMapJoinOuterGenerateResultOperator.java |  805 ++++---
 .../mapjoin/VectorMapJoinOuterLongOperator.java |  189 +-
 .../VectorMapJoinOuterMultiKeyOperator.java     |  184 +-
 .../VectorMapJoinOuterStringOperator.java       |  185 +-
 .../mapjoin/VectorMapJoinRowBytesContainer.java |    2 +-
 .../fast/VectorMapJoinFastBytesHashMap.java     |    8 +-
 .../VectorMapJoinFastBytesHashMultiSet.java     |    4 +-
 .../fast/VectorMapJoinFastBytesHashTable.java   |   10 +-
 .../mapjoin/fast/VectorMapJoinFastKeyStore.java |   10 +-
 .../fast/VectorMapJoinFastLongHashMap.java      |    2 +-
 .../fast/VectorMapJoinFastLongHashTable.java    |   18 +-
 .../fast/VectorMapJoinFastTableContainer.java   |    2 +-
 .../fast/VectorMapJoinFastValueStore.java       |    8 +-
 .../VectorMapJoinOptimizedLongCommon.java       |    4 +-
 .../hive/ql/optimizer/physical/Vectorizer.java  |   24 +-
 .../test/queries/clientpositive/vector_join30.q |  160 ++
 .../clientpositive/vector_join_filters.q        |   38 +
 .../queries/clientpositive/vector_join_nulls.q  |   33 +
 .../clientpositive/vector_left_outer_join2.q    |    2 +
 .../queries/clientpositive/vector_outer_join5.q |  173 ++
 .../tez/acid_vectorization_partition.q.out      |   20 +-
 .../clientpositive/tez/vector_join30.q.out      | 1367 +++++++++++
 .../tez/vector_join_filters.q.out               |  222 ++
 .../clientpositive/tez/vector_join_nulls.q.out  |  195 ++
 .../tez/vector_left_outer_join2.q.out           |   20 +-
 .../tez/vector_left_outer_join3.q.out           |  222 ++
 .../clientpositive/tez/vector_outer_join5.q.out | 1328 +++++++++++
 .../tez/vectorized_timestamp_ints_casts.q.out   |  234 ++
 .../results/clientpositive/vector_join30.q.out  | 2194 ++++++++++++++++++
 .../clientpositive/vector_join_filters.q.out    |  222 ++
 .../clientpositive/vector_join_nulls.q.out      |  195 ++
 .../vector_left_outer_join2.q.out               |    8 +-
 .../clientpositive/vector_outer_join5.q.out     | 1300 +++++++++++
 49 files changed, 8936 insertions(+), 796 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index eeb46cc..0f2a831 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -220,8 +220,12 @@ minitez.query.files.shared=alter_merge_2_orc.q,\
   vector_groupby_3.q,\
   vector_groupby_reduce.q,\
   vector_if_expr.q,\
+  vector_inner_join.q,\
   vector_interval_1.q,\
   vector_interval_2.q,\
+  vector_join30.q,\
+  vector_join_filters.q,\
+  vector_join_nulls.q,\
   vector_left_outer_join.q,\
   vector_left_outer_join2.q,\
   vector_leftsemi_mapjoin.q,\
@@ -230,6 +234,12 @@ minitez.query.files.shared=alter_merge_2_orc.q,\
   vector_multi_insert.q,\
   vector_non_string_partition.q,\
   vector_orderby_5.q,\
+  vector_outer_join0.q,\
+  vector_outer_join1.q,\
+  vector_outer_join2.q,\
+  vector_outer_join3.q,\
+  vector_outer_join4.q,\
+  vector_outer_join5.q,\
   vector_partition_diff_num_cols.q,\
   vector_partitioned_date_time.q,\
   vector_reduce_groupby_decimal.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index dcea8ae..4a16b4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -645,8 +645,7 @@ public class VectorizedBatchUtil {
   public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
     StringBuffer sb = new StringBuffer();
     sb.append(prefix + " row " + index + " ");
-    for (int i = 0; i < batch.projectionSize; i++) {
-      int column = batch.projectedColumns[i];
+    for (int column = 0; column < batch.cols.length; column++) {
       ColumnVector colVector = batch.cols[column];
       if (colVector == null) {
         sb.append("(null colVector " + column + ")");
@@ -666,7 +665,7 @@ public class VectorizedBatchUtil {
             if (bytes == null) {
               sb.append("(Unexpected null bytes with start " + start + " length " + length + ")");
             } else {
-              sb.append(displayBytes(bytes, start, length));
+              sb.append("bytes: '" + displayBytes(bytes, start, length) + "'");
             }
           } else if (colVector instanceof DecimalColumnVector) {
             sb.append(((DecimalColumnVector) colVector).vector[index].toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index a9082eb..af78776 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -470,8 +470,8 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
       LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor bigTableByteColumnVectorColumns " + Arrays.toString(bigTableByteColumnVectorColumns));
       LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor smallTableByteColumnVectorColumns " + Arrays.toString(smallTableByteColumnVectorColumns));
 
-      LOG.info(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputProjection " + Arrays.toString(outputProjection));
-      LOG.info(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputTypeNames " + Arrays.toString(outputTypeNames));
+      LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputProjection " + Arrays.toString(outputProjection));
+      LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputTypeNames " + Arrays.toString(outputTypeNames));
     }
 
     setupVOutContext(conf.getOutputColumnNames());
@@ -503,7 +503,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
    */
   protected void setupVOutContext(List<String> outputColumnNames) {
     if (LOG.isDebugEnabled()) {
-      LOG.info(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputColumnNames " + outputColumnNames);
+      LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator constructor outputColumnNames " + outputColumnNames);
     }
     if (outputColumnNames.size() != outputProjection.length) {
       throw new RuntimeException("Output column names " + outputColumnNames + " length and output projection " + Arrays.toString(outputProjection) + " / " + Arrays.toString(outputTypeNames) + " length mismatch");
@@ -729,9 +729,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
    * Common one time setup by native vectorized map join operator's processOp.
    */
   protected void commonSetup(VectorizedRowBatch batch) throws HiveException {
-    LOG.info("VectorMapJoinInnerCommonOperator commonSetup begin...");
 
     if (LOG.isDebugEnabled()) {
+      LOG.debug("VectorMapJoinInnerCommonOperator commonSetup begin...");
       displayBatchColumns(batch, "batch");
       displayBatchColumns(overflowBatch, "overflowBatch");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 860ebb5..32c126c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -373,10 +373,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
    *          The big table batch.
    * @param hashMapResult
    *          The hash map results for the repeated key.
-   * @return
-   *          The new count of selected rows.
    */
-  protected int generateHashMapResultRepeatedAll(VectorizedRowBatch batch,
+  protected void generateHashMapResultRepeatedAll(VectorizedRowBatch batch,
               VectorMapJoinHashMapResult hashMapResult) throws IOException, HiveException {
 
     int[] selected = batch.selected;
@@ -400,7 +398,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
           batch.selected, 0, batch.size);
     }
 
-    return numSel;
+    batch.size = numSel;
   }
 
   //-----------------------------------------------------------------------------------------------
@@ -462,7 +460,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 //  int length = output.getLength() - offset;
     rowBytesContainer.finishRow();
 
-//  LOG.info("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length);
+//  LOG.debug("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length);
   }
 
   protected void spillHashMapBatch(VectorizedRowBatch batch,
@@ -514,14 +512,18 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
         smallTable);
     needHashTableSetup = true;
 
-    LOG.info(CLASS_NAME + " reloadHashTable!");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(CLASS_NAME + " reloadHashTable!");
+    }
   }
 
   @Override
   protected void reProcessBigTable(int partitionId)
       throws HiveException {
 
-    LOG.info(CLASS_NAME + " reProcessBigTable enter...");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(CLASS_NAME + " reProcessBigTable enter...");
+    }
 
     if (spillReplayBatch == null) {
       // The process method was not called -- no big table rows.
@@ -544,14 +546,14 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
         int offset = bigTable.currentOffset();
         int length = bigTable.currentLength();
 
-//      LOG.info(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length);
+//      LOG.debug(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length);
 
         bigTableVectorDeserializeRow.setBytes(bytes, offset, length);
         bigTableVectorDeserializeRow.deserializeByValue(spillReplayBatch, spillReplayBatch.size);
         spillReplayBatch.size++;
 
         if (spillReplayBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
-          LOG.info("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows");
+          // LOG.debug("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows");
           process(spillReplayBatch, posBigTable); // call process once we have a full batch
           spillReplayBatch.reset();
           batchCount++;
@@ -559,7 +561,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
       }
       // Process the row batch that has less than DEFAULT_SIZE rows
       if (spillReplayBatch.size > 0) {
-        LOG.info("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows");
+        // LOG.debug("reProcessBigTable going to call process with spillReplayBatch.size " + spillReplayBatch.size + " rows");
         process(spillReplayBatch, posBigTable);
         spillReplayBatch.reset();
         batchCount++;
@@ -570,7 +572,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
       throw new HiveException(e);
     }
 
-    LOG.info(CLASS_NAME + " reProcessBigTable exit! " + rowCount + " row processed and " + batchCount + " batches processed");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(CLASS_NAME + " reProcessBigTable exit! " + rowCount + " row processed and " + batchCount + " batches processed");
+    }
   }
 
 
@@ -632,7 +636,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     if (!aborted && overflowBatch.size > 0) {
       forwardOverflow();
     }
-    LOG.info("VectorMapJoinInnerLongOperator closeOp " + batchCounter + " batches processed");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("VectorMapJoinInnerLongOperator closeOp " + batchCounter + " batches processed");
+    }
   }
 
   //-----------------------------------------------------------------------------------------------
@@ -641,6 +647,23 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
    * Debug.
    */
 
+  public boolean verifyMonotonicallyIncreasing(int[] selected, int size) {
+
+    if (size == 0) {
+      return true;
+    }
+    int prevBatchIndex = selected[0];
+
+    for (int i = 1; i < size; i++) {
+      int batchIndex = selected[i];
+      if (batchIndex <= prevBatchIndex) {
+        return false;
+      }
+      prevBatchIndex = batchIndex;
+    }
+    return true;
+  }
+
   public static String intArrayToRangesString(int selection[], int size) {
     if (size == 0) {
       return "[]";

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java
index 3132531..f18b982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java
@@ -129,22 +129,10 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
    * @param batch
    *          The big table batch with any matching and any non matching rows both as
    *          selected in use.
-   * @param allMatchs
-   *          A subset of the rows of the batch that are matches.
    * @param allMatchCount
    *          Number of matches in allMatchs.
-   * @param equalKeySeriesValueCounts
-   *          For each equal key series, whether the number of (empty) small table values.
-   * @param equalKeySeriesAllMatchIndices
-   *          For each equal key series, the logical index into allMatchs.
-   * @param equalKeySeriesDuplicateCounts
-   *          For each equal key series, the number of duplicates or equal keys.
    * @param equalKeySeriesCount
    *          Number of single value matches.
-   * @param spills
-   *          A subset of the rows of the batch that are spills.
-   * @param spillHashMapResultIndices
-   *          For each entry in spills, the index into the hashMapResult.
    * @param spillCount
    *          Number of spills in spills.
    * @param hashTableResults
@@ -154,15 +142,16 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
    *          Number of entries in hashMapResults.
    *
    **/
-  protected int finishInnerBigOnly(VectorizedRowBatch batch,
-      int[] allMatchs, int allMatchCount,
-      long[] equalKeySeriesValueCounts, int[] equalKeySeriesAllMatchIndices,
-      int[] equalKeySeriesDuplicateCounts, int equalKeySeriesCount,
-      int[] spills, int[] spillHashMapResultIndices, int spillCount,
+  protected void finishInnerBigOnly(VectorizedRowBatch batch,
+      int allMatchCount, int equalKeySeriesCount, int spillCount,
       VectorMapJoinHashTableResult[] hashTableResults, int hashMapResultCount)
           throws HiveException, IOException {
 
-    int numSel = 0;
+    // Get rid of spills before we start modifying the batch.
+    if (spillCount > 0) {
+      spillHashMapBatch(batch, hashTableResults,
+          spills, spillHashMapResultIndices, spillCount);
+    }
 
     /*
      * Optimize by running value expressions only over the matched rows.
@@ -171,6 +160,7 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
       performValueExpressions(batch, allMatchs, allMatchCount);
     }
 
+    int numSel = 0;
     for (int i = 0; i < equalKeySeriesCount; i++) {
       long count = equalKeySeriesValueCounts[i];
       int allMatchesIndex = equalKeySeriesAllMatchIndices[i];
@@ -185,13 +175,8 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
             duplicateCount, count);
       }
     }
-
-    if (spillCount > 0) {
-      spillHashMapBatch(batch, hashTableResults,
-          spills, spillHashMapResultIndices, spillCount);
-    }
-
-    return numSel;
+    batch.size = numSel;
+    batch.selectedInUse = true;
   }
 
   /**
@@ -215,11 +200,11 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
       int[] allMatchs, int allMatchesIndex, int duplicateCount, int numSel)
           throws HiveException, IOException {
 
-    // LOG.info("generateHashMultiSetResultSingleValue enter...");
+    // LOG.debug("generateHashMultiSetResultSingleValue enter...");
 
     // Generate result within big table batch itself.
 
-    // LOG.info("generateHashMultiSetResultSingleValue with big table...");
+    // LOG.debug("generateHashMultiSetResultSingleValue with big table...");
 
     for (int i = 0; i < duplicateCount; i++) {
 
@@ -250,7 +235,7 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
       int[] allMatchs, int allMatchesIndex,
       int duplicateCount, long count) throws HiveException, IOException {
 
-    // LOG.info("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count);
+    // LOG.debug("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count);
 
     // TODO: Look at repeating optimizations...
 
@@ -309,11 +294,9 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
     return 0;
   }
 
-  protected int finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
+  protected void finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
       VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException, IOException {
 
-    int numSel = 0;
-
     switch (joinResult) {
     case MATCH:
 
@@ -325,19 +308,21 @@ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator
       }
 
       // Generate special repeated case.
-      numSel = generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult);
+      int numSel = generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult);
+      batch.size = numSel;
+      batch.selectedInUse = true;
       break;
 
     case SPILL:
       // Whole batch is spilled.
       spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMultiSetResult);
+      batch.size = 0;
       break;
 
     case NOMATCH:
       // No match for entire batch.
+      batch.size = 0;
       break;
     }
-
-    return numSel;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java
index 53a91d8..bb7efda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java
@@ -151,9 +151,6 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Single-Column Long specific declarations.
        */
@@ -198,7 +195,7 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]);
+        finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]);
       } else {
 
         /*
@@ -358,17 +355,11 @@ public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBig
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount)));
         }
 
-        numSel = finishInnerBigOnly(batch,
-            allMatchs, allMatchCount,
-            equalKeySeriesValueCounts, equalKeySeriesAllMatchIndices,
-            equalKeySeriesDuplicateCounts, equalKeySeriesCount,
-            spills, spillHashMapResultIndices, spillCount,
+        finishInnerBigOnly(batch,
+            allMatchCount, equalKeySeriesCount, spillCount,
             (VectorMapJoinHashTableResult[]) hashMultiSetResults, hashMultiSetResultCount);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
index 9553fa0..c36f668 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
@@ -156,9 +156,6 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Multi-Key specific declarations.
        */
@@ -210,7 +207,7 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]);
+        finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]);
       } else {
 
         /*
@@ -371,17 +368,11 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount)));
         }
 
-        numSel = finishInnerBigOnly(batch,
-            allMatchs, allMatchCount,
-            equalKeySeriesValueCounts, equalKeySeriesAllMatchIndices,
-            equalKeySeriesDuplicateCounts, equalKeySeriesCount,
-            spills, spillHashMapResultIndices, spillCount,
+        finishInnerBigOnly(batch,
+            allMatchCount, equalKeySeriesCount, spillCount,
             (VectorMapJoinHashTableResult[]) hashMultiSetResults, hashMultiSetResultCount);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java
index 17d0b63..87a11c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java
@@ -187,7 +187,7 @@ public class VectorMapJoinInnerBigOnlyStringOperator extends VectorMapJoinInnerB
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]);
+        finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]);
       } else {
 
         /*
@@ -347,17 +347,11 @@ public class VectorMapJoinInnerBigOnlyStringOperator extends VectorMapJoinInnerB
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount)));
         }
 
-        numSel = finishInnerBigOnly(batch,
-            allMatchs, allMatchCount,
-            equalKeySeriesValueCounts, equalKeySeriesAllMatchIndices,
-            equalKeySeriesDuplicateCounts, equalKeySeriesCount,
-            spills, spillHashMapResultIndices, spillCount,
+        finishInnerBigOnly(batch,
+            allMatchCount, equalKeySeriesCount, spillCount,
             (VectorMapJoinHashTableResult[]) hashMultiSetResults, hashMultiSetResultCount);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java
index 3a5e4b2..ee1abd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java
@@ -147,38 +147,17 @@ public abstract class VectorMapJoinInnerGenerateResultOperator
    * @param batch
    *          The big table batch with any matching and any non matching rows both as
    *          selected in use.
-   * @param allMatchs
-   *          A subset of the rows of the batch that are matches.
    * @param allMatchCount
    *          Number of matches in allMatchs.
-   * @param equalKeySeriesHashMapResultIndices
-   *          For each equal key series, the index into the hashMapResult.
-   * @param equalKeySeriesAllMatchIndices
-   *          For each equal key series, the logical index into allMatchs.
-   * @param equalKeySeriesIsSingleValue
-   *          For each equal key series, whether there is 1 or multiple small table values.
-   * @param equalKeySeriesDuplicateCounts
-   *          For each equal key series, the number of duplicates or equal keys.
    * @param equalKeySeriesCount
    *          Number of single value matches.
-   * @param spills
-   *          A subset of the rows of the batch that are spills.
-   * @param spillHashMapResultIndices
-   *          For each entry in spills, the index into the hashMapResult.
    * @param spillCount
    *          Number of spills in spills.
-   * @param hashMapResults
-   *          The array of all hash map results for the batch.
    * @param hashMapResultCount
    *          Number of entries in hashMapResults.
    */
-  protected int finishInner(VectorizedRowBatch batch,
-      int[] allMatchs, int allMatchCount,
-      int[] equalKeySeriesHashMapResultIndices, int[] equalKeySeriesAllMatchIndices,
-      boolean[] equalKeySeriesIsSingleValue, int[] equalKeySeriesDuplicateCounts,
-      int equalKeySeriesCount,
-      int[] spills, int[] spillHashMapResultIndices, int spillCount,
-      VectorMapJoinHashMapResult[] hashMapResults, int hashMapResultCount)
+  protected void finishInner(VectorizedRowBatch batch,
+      int allMatchCount, int equalKeySeriesCount, int spillCount, int hashMapResultCount)
           throws HiveException, IOException {
 
     int numSel = 0;
@@ -211,10 +190,11 @@ public abstract class VectorMapJoinInnerGenerateResultOperator
           spills, spillHashMapResultIndices, spillCount);
     }
 
-    return numSel;
+    batch.size = numSel;
+    batch.selectedInUse = true;
   }
 
-  protected int finishInnerRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
+  protected void finishInnerRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
       VectorMapJoinHashTableResult hashMapResult) throws HiveException, IOException {
 
     int numSel = 0;
@@ -230,22 +210,19 @@ public abstract class VectorMapJoinInnerGenerateResultOperator
       }
 
       // Generate special repeated case.
-      numSel = generateHashMapResultRepeatedAll(batch, hashMapResults[0]);
+      generateHashMapResultRepeatedAll(batch, hashMapResults[0]);
       break;
 
     case SPILL:
       // Whole batch is spilled.
       spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMapResults[0]);
+      batch.size = 0;
       break;
 
     case NOMATCH:
       // No match for entire batch.
+      batch.size = 0;
       break;
     }
-    /*
-     * Common repeated join result processing.
-     */
-
-    return numSel;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java
index b77a93c..9005d00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java
@@ -149,9 +149,6 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Single-Column Long specific declarations.
        */
@@ -196,7 +193,7 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishInnerRepeated(batch, joinResult, hashMapResults[0]);
+        finishInnerRepeated(batch, joinResult, hashMapResults[0]);
       } else {
 
         /*
@@ -356,18 +353,10 @@ public class VectorMapJoinInnerLongOperator extends VectorMapJoinInnerGenerateRe
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMapResults, 0, hashMapResultCount)));
         }
 
-        numSel = finishInner(batch,
-            allMatchs, allMatchCount,
-            equalKeySeriesHashMapResultIndices, equalKeySeriesAllMatchIndices,
-            equalKeySeriesIsSingleValue, equalKeySeriesDuplicateCounts,
-            equalKeySeriesCount,
-            spills, spillHashMapResultIndices, spillCount,
-            hashMapResults, hashMapResultCount);
+        finishInner(batch,
+            allMatchCount, equalKeySeriesCount, spillCount, hashMapResultCount);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
index 938506b..b13ded6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
@@ -153,9 +153,6 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Multi-Key specific declarations.
        */
@@ -207,7 +204,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishInnerRepeated(batch, joinResult, hashMapResults[0]);
+        finishInnerRepeated(batch, joinResult, hashMapResults[0]);
       } else {
 
         /*
@@ -279,7 +276,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
             haveSaveKey = true;
 
             /*
-             * Multi-Key specific save key and lookup.
+             * Multi-Key specific save key.
              */
 
             temp = saveKeyOutput;
@@ -368,18 +365,10 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMapResults, 0, hashMapResultCount)));
         }
 
-        numSel = finishInner(batch,
-            allMatchs, allMatchCount,
-            equalKeySeriesHashMapResultIndices, equalKeySeriesAllMatchIndices,
-            equalKeySeriesIsSingleValue, equalKeySeriesDuplicateCounts,
-            equalKeySeriesCount,
-            spills, spillHashMapResultIndices, spillCount,
-            hashMapResults, hashMapResultCount);
+        finishInner(batch,
+            allMatchCount, equalKeySeriesCount, spillCount, hashMapResultCount);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java
index f7dd8e2..9f10ff1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java
@@ -140,9 +140,6 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Single-Column String specific declarations.
        */
@@ -185,7 +182,7 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishInnerRepeated(batch, joinResult, hashMapResults[0]);
+        finishInnerRepeated(batch, joinResult, hashMapResults[0]);
       } else {
 
         /*
@@ -345,18 +342,10 @@ public class VectorMapJoinInnerStringOperator extends VectorMapJoinInnerGenerate
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMapResults, 0, hashMapResultCount)));
         }
 
-        numSel = finishInner(batch,
-            allMatchs, allMatchCount,
-            equalKeySeriesHashMapResultIndices, equalKeySeriesAllMatchIndices,
-            equalKeySeriesIsSingleValue, equalKeySeriesDuplicateCounts,
-            equalKeySeriesCount,
-            spills, spillHashMapResultIndices, spillCount,
-            hashMapResults, hashMapResultCount);
+        finishInner(batch,
+            allMatchCount, equalKeySeriesCount, spillCount, hashMapResultCount);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java
index 230f9fe..07393b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java
@@ -111,26 +111,23 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator
    * @param batch
    *          The big table batch with any matching and any non matching rows both as
    *          selected in use.
-   * @param allMatchs
-   *          A subset of the rows of the batch that are matches.
    * @param allMatchCount
    *          Number of matches in allMatchs.
-   * @param spills
-   *          A subset of the rows of the batch that are spills.
-   * @param spillHashMapResultIndices
-   *          For each entry in spills, the index into the hashTableResults.
    * @param spillCount
    *          Number of spills in spills.
    * @param hashTableResults
    *          The array of all hash table results for the batch. We need the
    *          VectorMapJoinHashTableResult for the spill information.
    */
-  protected int finishLeftSemi(VectorizedRowBatch batch,
-      int[] allMatchs, int allMatchCount,
-      int[] spills, int[] spillHashMapResultIndices, int spillCount,
+  protected void finishLeftSemi(VectorizedRowBatch batch,
+      int allMatchCount, int spillCount,
       VectorMapJoinHashTableResult[] hashTableResults) throws HiveException, IOException {
 
-    int numSel;
+    // Get rid of spills before we start modifying the batch.
+    if (spillCount > 0) {
+      spillHashMapBatch(batch, hashTableResults,
+          spills, spillHashMapResultIndices, spillCount);
+    }
 
     /*
      * Optimize by running value expressions only over the matched rows.
@@ -139,14 +136,9 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator
       performValueExpressions(batch, allMatchs, allMatchCount);
     }
 
-    numSel = generateHashSetResults(batch, allMatchs, allMatchCount);
-
-    if (spillCount > 0) {
-      spillHashMapBatch(batch, hashTableResults,
-          spills, spillHashMapResultIndices, spillCount);
-    }
-
-    return numSel;
+    int numSel = generateHashSetResults(batch, allMatchs, allMatchCount);
+    batch.size = numSel;
+    batch.selectedInUse = true;
   }
 
   /**
@@ -199,11 +191,9 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator
     return batch.size;
   }
 
-  protected int finishLeftSemiRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
+  protected void finishLeftSemiRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
       VectorMapJoinHashTableResult hashSetResult) throws HiveException, IOException {
 
-    int numSel = 0;
-
     switch (joinResult) {
     case MATCH:
 
@@ -215,19 +205,21 @@ public abstract class VectorMapJoinLeftSemiGenerateResultOperator
       }
 
       // Generate special repeated case.
-      numSel = generateHashSetResultRepeatedAll(batch);
+      int numSel = generateHashSetResultRepeatedAll(batch);
+      batch.size = numSel;
+      batch.selectedInUse = true;
       break;
 
     case SPILL:
       // Whole batch is spilled.
       spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashSetResult);
+      batch.size = 0;
       break;
 
     case NOMATCH:
       // No match for entire batch.
+      batch.size = 0;
       break;
     }
-
-    return numSel;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java
index 75aeefb..712978a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java
@@ -151,9 +151,6 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Single-Column Long specific declarations.
        */
@@ -198,7 +195,7 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]);
+        finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]);
       } else {
 
         /*
@@ -348,15 +345,11 @@ public class VectorMapJoinLeftSemiLongOperator extends VectorMapJoinLeftSemiGene
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashSetResults, 0, hashSetResultCount)));
         }
 
-        numSel = finishLeftSemi(batch,
-            allMatchs, allMatchCount,
-            spills, spillHashMapResultIndices, spillCount,
+        finishLeftSemi(batch,
+            allMatchCount, spillCount,
             (VectorMapJoinHashTableResult[]) hashSetResults);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
index ea287f4..b941431 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
@@ -155,9 +155,6 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Multi-Key specific declarations.
        */
@@ -210,7 +207,7 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]);
+        finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]);
       } else {
 
         /*
@@ -291,6 +288,10 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
             saveKeyOutput = currentKeyOutput;
             currentKeyOutput = temp;
 
+            /*
+             * Multi-key specific lookup key.
+             */
+
             byte[] keyBytes = saveKeyOutput.getData();
             int keyLength = saveKeyOutput.getLength();
             saveJoinResult = hashSet.contains(keyBytes, 0, keyLength, hashSetResults[hashSetResultCount]);
@@ -360,15 +361,11 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashSetResults, 0, hashSetResultCount)));
         }
 
-        numSel = finishLeftSemi(batch,
-            allMatchs, allMatchCount,
-            spills, spillHashMapResultIndices, spillCount,
+        finishLeftSemi(batch,
+            allMatchCount, spillCount,
             (VectorMapJoinHashTableResult[]) hashSetResults);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java
index 116cb81..9ff1141 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java
@@ -142,9 +142,6 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe
         }
       }
 
-      // We rebuild in-place the selected array with rows destine to be forwarded.
-      int numSel = 0;
-
       /*
        * Single-Column String specific declarations.
        */
@@ -187,7 +184,7 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe
         if (LOG.isDebugEnabled()) {
           LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name());
         }
-        numSel = finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]);
+        finishLeftSemiRepeated(batch, joinResult, hashSetResults[0]);
       } else {
 
         /*
@@ -263,6 +260,10 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe
 
             saveKeyBatchIndex = batchIndex;
 
+            /*
+             * Single-Column String specific lookup key.
+             */
+
             byte[] keyBytes = vector[batchIndex];
             int keyStart = start[batchIndex];
             int keyLength = length[batchIndex];
@@ -333,15 +334,11 @@ public class VectorMapJoinLeftSemiStringOperator extends VectorMapJoinLeftSemiGe
               " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashSetResults, 0, hashSetResultCount)));
         }
 
-        numSel = finishLeftSemi(batch,
-            allMatchs, allMatchCount,
-            spills, spillHashMapResultIndices, spillCount,
+        finishLeftSemi(batch,
+            allMatchCount, spillCount,
             (VectorMapJoinHashTableResult[]) hashSetResults);
       }
 
-      batch.selectedInUse = true;
-      batch.size =  numSel;
-
       if (batch.size > 0) {
         // Forward any remaining selected rows.
         forwardBigTableBatch(batch);

http://git-wip-us.apache.org/repos/asf/hive/blob/d421201c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java
index 7ef5574..57814fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java
@@ -70,15 +70,34 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
   // generation.
   protected transient VectorMapJoinHashMapResult hashMapResults[];
 
-  // Pre-allocated member for storing any matching row indexes during a processOp call.
-  protected transient int[] matchs;
+  // Pre-allocated member for remembering the big table's selected array at the beginning of
+  // the process method before applying any filter.  For outer join we need to remember which
+  // rows did not match since they will appear the in outer join result with NULLs for the
+  // small table.
+  protected transient int[] inputSelected;
 
-  // Pre-allocated member for storing the mapping to the row batchIndex of the first of a series of
-  // equal keys that was looked up during a processOp call.
-  protected transient int[] matchHashMapResultIndices;
+  // Pre-allocated member for storing the (physical) batch index of matching row (single- or
+  // multi-small-table-valued) indexes during a process call.
+  protected transient int[] allMatchs;
 
-  // All matching and non-matching big table rows.
-  protected transient int[] nonSpills;
+  /*
+   *  Pre-allocated members for storing information equal key series for small-table matches.
+   *
+   *  ~HashMapResultIndices
+   *                Index into the hashMapResults array for the match.
+   *  ~AllMatchIndices
+   *                (Logical) indices into allMatchs to the first row of a match of a
+   *                possible series of duplicate keys.
+   *  ~IsSingleValue
+   *                Whether there is 1 or multiple small table values.
+   *  ~DuplicateCounts
+   *                The duplicate count for each matched key.
+   *
+   */
+  protected transient int[] equalKeySeriesHashMapResultIndices;
+  protected transient int[] equalKeySeriesAllMatchIndices;
+  protected transient boolean[] equalKeySeriesIsSingleValue;
+  protected transient int[] equalKeySeriesDuplicateCounts;
 
   // Pre-allocated member for storing the (physical) batch index of rows that need to be spilled.
   protected transient int[] spills;
@@ -86,8 +105,11 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
   // Pre-allocated member for storing index into the hashSetResults for each spilled row.
   protected transient int[] spillHashMapResultIndices;
 
-  // Pre-allocated member for storing any non-matching row indexes during a processOp call.
-  protected transient int[] scratch1;
+  // Pre-allocated member for storing any non-spills, non-matches, or merged row indexes during a
+  // process method call.
+  protected transient int[] nonSpills;
+  protected transient int[] noMatchs;
+  protected transient int[] merged;
 
   public VectorMapJoinOuterGenerateResultOperator() {
     super();
@@ -111,12 +133,23 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
     for (int i = 0; i < hashMapResults.length; i++) {
       hashMapResults[i] = baseHashMap.createHashMapResult();
     }
-    matchs = new int[batch.DEFAULT_SIZE];
-    matchHashMapResultIndices = new int[batch.DEFAULT_SIZE];
-    nonSpills = new int[batch.DEFAULT_SIZE];
+
+    inputSelected = new int[batch.DEFAULT_SIZE];
+
+    allMatchs = new int[batch.DEFAULT_SIZE];
+
+    equalKeySeriesHashMapResultIndices = new int[batch.DEFAULT_SIZE];
+    equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE];
+    equalKeySeriesIsSingleValue = new boolean[batch.DEFAULT_SIZE];
+    equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE];
+
     spills = new int[batch.DEFAULT_SIZE];
     spillHashMapResultIndices = new int[batch.DEFAULT_SIZE];
-    scratch1 = new int[batch.DEFAULT_SIZE];
+
+    nonSpills = new int[batch.DEFAULT_SIZE];
+    noMatchs = new int[batch.DEFAULT_SIZE];
+    merged = new int[batch.DEFAULT_SIZE];
+
   }
 
   //-----------------------------------------------------------------------------------------------
@@ -145,260 +178,372 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
   }
 
   /**
-   * Generate the outer join output results for one vectorized row batch.
-   *
-   * Any filter expressions will apply now since hash map lookup for outer join is complete.
+   * Apply the value expression to rows in the (original) input selected array.
    *
    * @param batch
-   *          The big table batch with any matching and any non matching rows both as
-   *          selected in use.
-   * @param matchs
-   *          A subset of the rows of the batch that are matches.
-   * @param matchHashMapResultIndices
-   *          For each entry in matches, the index into the hashMapResult.
-   * @param matchSize
-   *          Number of matches in matchs.
-   * @param nonSpills
-   *          The rows of the batch that are both matches and non-matches.
-   * @param nonspillCount
-   *          Number of rows in nonSpills.
-   * @param spills
-   *          A subset of the rows of the batch that are spills.
-   * @param spillHashMapResultIndices
-   *          For each entry in spills, the index into the hashMapResult.
-   * @param spillCount
-   *          Number of spills in spills.
-   * @param hashMapResults
-   *          The array of all hash map results for the batch.
-   * @param hashMapResultCount
-   *          Number of entries in hashMapResults.
-   * @param scratch1
-   *          Pre-allocated storage to internal use.
+   *          The vectorized row batch.
+   * @param inputSelectedInUse
+   *          Whether the (original) input batch is selectedInUse.
+   * @param inputLogicalSize
+   *          The (original) input batch size.
    */
-  public int finishOuter(VectorizedRowBatch batch,
-      int[] matchs, int[] matchHashMapResultIndices, int matchCount,
-      int[] nonSpills, int nonSpillCount,
-      int[] spills, int[] spillHashMapResultIndices, int spillCount,
-      VectorMapJoinHashMapResult[] hashMapResults, int hashMapResultCount,
-      int[] scratch1) throws IOException, HiveException {
-
-     int numSel = 0;
-
-    // At this point we have determined the matching rows only for the ON equality condition(s).
-    // Implicitly, non-matching rows are those in the selected array minus matchs.
+  private void doValueExprOnInputSelected(VectorizedRowBatch batch,
+      boolean inputSelectedInUse, int inputLogicalSize) {
 
-    // Next, for outer join, apply any ON predicates to filter down the matches.
-    if (matchCount > 0 && bigTableFilterExpressions.length > 0) {
+    int saveBatchSize = batch.size;
+    int[] saveSelected = batch.selected;
+    boolean saveSelectedInUse = batch.selectedInUse;
 
-      System.arraycopy(matchs, 0, batch.selected, 0, matchCount);
-      batch.size = matchCount;
+    batch.size = inputLogicalSize;
+    batch.selected = inputSelected;
+    batch.selectedInUse = inputSelectedInUse;
 
-      // Non matches will be removed from the selected array.
-      for (VectorExpression ve : bigTableFilterExpressions) {
+    if (bigTableValueExpressions != null) {
+      for(VectorExpression ve: bigTableValueExpressions) {
         ve.evaluate(batch);
       }
+    }
 
-      // LOG.info("finishOuter" +
-      //     " filtered batch.selected " + Arrays.toString(Arrays.copyOfRange(batch.selected, 0, batch.size)));
-
-      // Fixup the matchHashMapResultIndices array.
-      if (batch.size < matchCount) {
-        int numMatch = 0;
-        int[] selected = batch.selected;
-        for (int i = 0; i < batch.size; i++) {
-          if (selected[i] == matchs[numMatch]) {
-            matchHashMapResultIndices[numMatch] = matchHashMapResultIndices[i];
-            numMatch++;
-            if (numMatch == matchCount) {
-              break;
-            }
-          }
-        }
-        System.arraycopy(batch.selected, 0, matchs, 0, matchCount);
+    batch.size = saveBatchSize;
+    batch.selected = saveSelected;
+    batch.selectedInUse = saveSelectedInUse;
+  }
+
+  /**
+   * Apply the value expression to rows specified by a selected array.
+   *
+   * @param batch
+   *          The vectorized row batch.
+   * @param selected
+   *          The (physical) batch indices to apply the expression to.
+   * @param size
+   *          The size of selected.
+   */
+  private void doValueExpr(VectorizedRowBatch batch,
+      int[] selected, int size) {
+
+    int saveBatchSize = batch.size;
+    int[] saveSelected = batch.selected;
+    boolean saveSelectedInUse = batch.selectedInUse;
+
+    batch.size = size;
+    batch.selected = selected;
+    batch.selectedInUse = true;
+
+    if (bigTableValueExpressions != null) {
+      for(VectorExpression ve: bigTableValueExpressions) {
+        ve.evaluate(batch);
       }
     }
-    // LOG.info("finishOuter" +
-    //     " matchs[" + matchCount + "] " + intArrayToRangesString(matchs, matchCount) +
-    //     " matchHashMapResultIndices " + Arrays.toString(Arrays.copyOfRange(matchHashMapResultIndices, 0, matchCount)));
 
-    // Big table value expressions apply to ALL matching and non-matching rows.
-    if (bigTableValueExpressions != null) {
+    batch.size = saveBatchSize;
+    batch.selected = saveSelected;
+    batch.selectedInUse = saveSelectedInUse;
+  }
 
-      System.arraycopy(nonSpills, 0, batch.selected, 0, nonSpillCount);
-      batch.size = nonSpillCount;
+  /**
+   * Remove (subtract) members from the input selected array and produce the results into
+   * a difference array.
+   *
+   * @param inputSelectedInUse
+   *          Whether the (original) input batch is selectedInUse.
+   * @param inputLogicalSize
+   *          The (original) input batch size.
+   * @param remove
+   *          The indices to remove.  They must all be present in input selected array.
+   * @param removeSize
+   *          The size of remove.
+   * @param difference
+   *          The resulting difference -- the input selected array indices not in the
+   *          remove array.
+   * @return
+   *          The resulting size of the difference array.
+   * @throws HiveException 
+   */
+  private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize,
+      int[] remove, int removeSize, int[] difference) throws HiveException {
 
-      for (VectorExpression ve: bigTableValueExpressions) {
-        ve.evaluate(batch);
+    // if (!verifyMonotonicallyIncreasing(remove, removeSize)) {
+    //   throw new HiveException("remove is not in sort order and unique");
+    // }
+
+   int differenceCount = 0;
+
+   // Determine which rows are left.
+   int removeIndex = 0;
+   if (inputSelectedInUse) {
+     for (int i = 0; i < inputLogicalSize; i++) {
+       int candidateIndex = inputSelected[i];
+       if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) {
+         removeIndex++;
+       } else {
+         difference[differenceCount++] = candidateIndex;
+       }
+     }
+   } else {
+     for (int candidateIndex = 0; candidateIndex < inputLogicalSize; candidateIndex++) {
+       if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) {
+         removeIndex++;
+       } else {
+         difference[differenceCount++] = candidateIndex;
+       }
+     }
+   }
+
+   if (removeIndex != removeSize) {
+     throw new HiveException("Not all batch indices removed");
+   }
+
+   // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) {
+   //   throw new HiveException("difference is not in sort order and unique");
+   // }
+
+   return differenceCount;
+ }
+
+  /**
+   * Remove (subtract) members from an array and produce the results into
+   * a difference array.
+
+   * @param all
+   *          The selected array containing all members.
+   * @param allSize
+   *          The size of all.
+   * @param remove
+   *          The indices to remove.  They must all be present in input selected array.
+   * @param removeSize
+   *          The size of remove.
+   * @param difference
+   *          The resulting difference -- the all array indices not in the
+   *          remove array.
+   * @return
+   *          The resulting size of the difference array.
+   * @throws HiveException 
+   */
+  private int subtract(int[] all, int allSize,
+      int[] remove, int removeSize, int[] difference) throws HiveException {
+
+    // if (!verifyMonotonicallyIncreasing(remove, removeSize)) {
+    //   throw new HiveException("remove is not in sort order and unique");
+    // }
+
+    int differenceCount = 0;
+
+    // Determine which rows are left.
+    int removeIndex = 0;
+    for (int i = 0; i < allSize; i++) {
+      int candidateIndex = all[i];
+      if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) {
+        removeIndex++;
+      } else {
+        difference[differenceCount++] = candidateIndex;
       }
     }
 
-    // Determine which rows are non matches by determining the delta between selected and
-    // matchs.
-    int[] noMatchs = scratch1;
-    int noMatchCount = 0;
-    if (matchCount < nonSpillCount) {
-      // Determine which rows are non matches.
-      int matchIndex = 0;
-      for (int i = 0; i < nonSpillCount; i++) {
-        int candidateIndex = nonSpills[i];
-        if (matchIndex < matchCount && candidateIndex == matchs[matchIndex]) {
-          matchIndex++;
+    if (removeIndex != removeSize) {
+      throw new HiveException("Not all batch indices removed");
+    }
+
+    return differenceCount;
+  }
+
+  /**
+   * Sort merge two select arrays so the resulting array is ordered by (batch) index.
+   *
+   * @param selected1
+   * @param selected1Count
+   * @param selected2
+   * @param selected2Count
+   * @param sortMerged
+   *          The resulting sort merge of selected1 and selected2.
+   * @return
+   *          The resulting size of the sortMerged array.
+   * @throws HiveException 
+   */
+  private int sortMerge(int[] selected1, int selected1Count,
+          int[] selected2, int selected2Count, int[] sortMerged) throws HiveException {
+
+    // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) {
+    //   throw new HiveException("selected1 is not in sort order and unique");
+    // }
+
+    // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) {
+    //   throw new HiveException("selected1 is not in sort order and unique");
+    // }
+
+
+    int sortMergeCount = 0;
+
+    int selected1Index = 0;
+    int selected2Index = 0;
+    for (int i = 0; i < selected1Count + selected2Count; i++) {
+      if (selected1Index < selected1Count && selected2Index < selected2Count) {
+        if (selected1[selected1Index] < selected2[selected2Index]) {
+          sortMerged[sortMergeCount++] = selected1[selected1Index++];
         } else {
-          noMatchs[noMatchCount++] = candidateIndex;
+          sortMerged[sortMergeCount++] = selected2[selected2Index++];
         }
+      } else if (selected1Index < selected1Count) {
+        sortMerged[sortMergeCount++] = selected1[selected1Index++];
+      } else {
+        sortMerged[sortMergeCount++] = selected2[selected2Index++];
       }
     }
-    // LOG.info("finishOuter" +
-    //     " noMatchs[" + noMatchCount + "] " + intArrayToRangesString(noMatchs, noMatchCount));
 
+    // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) {
+    //   throw new HiveException("sortMerged is not in sort order and unique");
+    // }
 
-    // When we generate results into the overflow batch, we may still end up with fewer rows
-    // in the big table batch.  So, nulSel and the batch's selected array will be rebuilt with
-    // just the big table rows that need to be forwarded, minus any rows processed with the
-    // overflow batch.
-    if (matchCount > 0) {
-      numSel = generateOuterHashMapMatchResults(batch,
-          matchs, matchHashMapResultIndices, matchCount,
-          hashMapResults, numSel);
-    }
+    return sortMergeCount;
+  }
 
-    if (noMatchCount > 0) {
-      numSel = generateOuterHashMapNoMatchResults(batch, noMatchs, noMatchCount, numSel);
-    }
+  /**
+   * Generate the outer join output results for one vectorized row batch.
+   *
+   * @param batch
+   *          The big table batch with any matching and any non matching rows both as
+   *          selected in use.
+   * @param allMatchCount
+   *          Number of matches in allMatchs.
+   * @param equalKeySeriesCount
+   *          Number of single value matches.
+   * @param atLeastOneNonMatch
+   *          Whether at least one row was a non-match.
+   * @param inputSelectedInUse
+   *          A copy of the batch's selectedInUse flag on input to the process method.
+   * @param inputLogicalSize
+   *          The batch's size on input to the process method.
+   * @param spillCount
+   *          Number of spills in spills.
+   * @param hashMapResultCount
+   *          Number of entries in hashMapResults.
+   */
+  public void finishOuter(VectorizedRowBatch batch,
+      int allMatchCount, int equalKeySeriesCount, boolean atLeastOneNonMatch,
+      boolean inputSelectedInUse, int inputLogicalSize,
+      int spillCount, int hashMapResultCount) throws IOException, HiveException {
 
+    // Get rid of spills before we start modifying the batch.
     if (spillCount > 0) {
       spillHashMapBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults,
           spills, spillHashMapResultIndices, spillCount);
     }
 
-    return numSel;
-  }
-
-   /**
-    * Generate the matching outer join output results for one row of a vectorized row batch into
-    * the overflow batch.
-    *
-    * @param batch
-    *          The big table batch.
-    * @param batchIndex
-    *          Index of the big table row.
-    * @param hashMapResult
-    *          The hash map result with the small table values.
-    */
-   private void copyOuterHashMapResultToOverflow(VectorizedRowBatch batch, int batchIndex,
-               VectorMapJoinHashMapResult hashMapResult) throws HiveException, IOException {
-
-     // if (hashMapResult.isCappedCountAvailable()) {
-     //   LOG.info("copyOuterHashMapResultToOverflow cappedCount " + hashMapResult.cappedCount());
-     // }
-     ByteSegmentRef byteSegmentRef = hashMapResult.first();
-     while (byteSegmentRef != null) {
-
-       // Copy the BigTable values into the overflow batch. Since the overflow batch may
-       // not get flushed here, we must copy by value.
-       if (bigTableRetainedVectorCopy != null) {
-         bigTableRetainedVectorCopy.copyByValue(batch, batchIndex,
-                                                overflowBatch, overflowBatch.size);
-       }
-
-       // Reference the keys we just copied above.
-       if (bigTableVectorCopyOuterKeys != null) {
-         bigTableVectorCopyOuterKeys.copyByReference(overflowBatch, overflowBatch.size,
-                                                     overflowBatch, overflowBatch.size);
-       }
-
-       if (smallTableVectorDeserializeRow != null) {
-
-         byte[] bytes = byteSegmentRef.getBytes();
-         int offset = (int) byteSegmentRef.getOffset();
-         int length = byteSegmentRef.getLength();
-         smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
-
-         smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.size);
-       }
+    int noMatchCount = 0;
+    if (spillCount > 0) {
 
-       ++overflowBatch.size;
-       if (overflowBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
-         forwardOverflow();
-       }
+      // Subtract the spills to get all match and non-match rows.
+      int nonSpillCount = subtractFromInputSelected(
+              inputSelectedInUse, inputLogicalSize, spills, spillCount, nonSpills);
 
-       byteSegmentRef = hashMapResult.next();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("finishOuter spillCount > 0" +
+            " nonSpills " + intArrayToRangesString(nonSpills, nonSpillCount));
+      }
+  
+      // Big table value expressions apply to ALL matching and non-matching rows.
+      if (bigTableValueExpressions != null) {
+  
+        doValueExpr(batch, nonSpills, nonSpillCount);
+  
       }
-     // LOG.info("copyOuterHashMapResultToOverflow overflowBatch.size " + overflowBatch.size);
+  
+      if (atLeastOneNonMatch) {
+        noMatchCount = subtract(nonSpills, nonSpillCount, allMatchs, allMatchCount,
+                noMatchs);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("finishOuter spillCount > 0" +
+              " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount));
+        }
 
-   }
+      }
+    } else {
 
-   /**
-    * Generate the matching outer join output results for one vectorized row batch.
-    *
-    * For each matching row specified by parameter, get the one or more small table values and
-    * form join results.
-    *
-    * (Note: Since all matching and non-matching rows are selected and output for outer joins,
-    * we cannot use selected as the matching rows).
-    *
-    * @param batch
-    *          The big table batch with any matching and any non matching rows both as
-    *          selected in use.
-    * @param matchs
-    *          A subset of the rows of the batch that are matches.
-    * @param matchHashMapResultIndices
-    *          For each entry in matches, the index into the hashMapResult.
-    * @param matchSize
-    *          Number of matches in matchs.
-    * @param hashMapResults
-    *          The array of all hash map results for the batch.
-    * @param numSel
-    *          The current count of rows in the rebuilding of the selected array.
-    *
-    * @return
-    *          The new count of selected rows.
-    */
-   protected int generateOuterHashMapMatchResults(VectorizedRowBatch batch,
-       int[] matchs, int[] matchHashMapResultIndices, int matchSize,
-       VectorMapJoinHashMapResult[] hashMapResults, int numSel)
-               throws IOException, HiveException {
+      // Run value expressions over original (whole) input batch.
+      doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize);
 
-     int[] selected = batch.selected;
+      if (atLeastOneNonMatch) {
+        noMatchCount = subtractFromInputSelected(
+            inputSelectedInUse, inputLogicalSize, allMatchs, allMatchCount, noMatchs);
 
-     // Generate result within big table batch when single small table value.  Otherwise, copy
-     // to overflow batch.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("finishOuter spillCount == 0" +
+              " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount));
+        }
+      }
+    }
 
-     for (int i = 0; i < matchSize; i++) {
-       int batchIndex = matchs[i];
+    // When we generate results into the overflow batch, we may still end up with fewer rows
+    // in the big table batch.  So, nulSel and the batch's selected array will be rebuilt with
+    // just the big table rows that need to be forwarded, minus any rows processed with the
+    // overflow batch.
+    if (allMatchCount > 0) {
+
+      int numSel = 0;
+      for (int i = 0; i < equalKeySeriesCount; i++) {
+        int hashMapResultIndex = equalKeySeriesHashMapResultIndices[i];
+        VectorMapJoinHashMapResult hashMapResult = hashMapResults[hashMapResultIndex];
+        int allMatchesIndex = equalKeySeriesAllMatchIndices[i];
+        boolean isSingleValue = equalKeySeriesIsSingleValue[i];
+        int duplicateCount = equalKeySeriesDuplicateCounts[i];
+
+        if (isSingleValue) {
+          numSel = generateHashMapResultSingleValue(
+                      batch, hashMapResult, allMatchs, allMatchesIndex, duplicateCount, numSel);
+        } else {
+          generateHashMapResultMultiValue(
+              batch, hashMapResult, allMatchs, allMatchesIndex, duplicateCount);
+        }
+      }
 
-       int hashMapResultIndex = matchHashMapResultIndices[i];
-       VectorMapJoinHashMapResult hashMapResult = hashMapResults[hashMapResultIndex];
+      // The number of single value rows that were generated in the big table batch.
+      batch.size = numSel;
+      batch.selectedInUse = true;
 
-       if (!hashMapResult.isSingleRow()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("finishOuter allMatchCount > 0" +
+            " batch.selected " + intArrayToRangesString(batch.selected, batch.size));
+      }
 
-         // Multiple small table rows require use of the overflow batch.
-         copyOuterHashMapResultToOverflow(batch, batchIndex, hashMapResult);
-       } else {
+    } else {
+      batch.size = 0;
+    }
 
-         // Generate join result in big table batch.
-         ByteSegmentRef byteSegmentRef = hashMapResult.first();
+    if (noMatchCount > 0) {
+      if (batch.size > 0) {
+
+        generateOuterNulls(batch, noMatchs, noMatchCount);
+  
+        // Merge noMatchs and (match) selected.
+        int mergeCount = sortMerge(
+                noMatchs, noMatchCount, batch.selected, batch.size, merged);
+    
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("finishOuter noMatchCount > 0 && batch.size > 0" +
+              " merged " + intArrayToRangesString(merged, mergeCount));
+        }
 
-         if (bigTableVectorCopyOuterKeys != null) {
-           bigTableVectorCopyOuterKeys.copyByReference(batch, batchIndex, batch, batchIndex);
-         }
+        System.arraycopy(merged, 0, batch.selected, 0, mergeCount);
+        batch.size = mergeCount;
+        batch.selectedInUse = true;
+      } else {
 
-         if (smallTableVectorDeserializeRow != null) {
+        // We can use the whole batch for output of no matches.
 
-           byte[] bytes = byteSegmentRef.getBytes();
-           int offset = (int) byteSegmentRef.getOffset();
-           int length = byteSegmentRef.getLength();
-           smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
+        generateOuterNullsRepeatedAll(batch);
 
-           smallTableVectorDeserializeRow.deserializeByValue(batch, batchIndex);
-         }
+        System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount);
+        batch.size = noMatchCount;
+        batch.selectedInUse = true;
 
-         // Remember this big table row was used for an output result.
-         selected[numSel++] = batchIndex;
-       }
-     }
-     return numSel;
-   }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("finishOuter noMatchCount > 0 && batch.size == 0" +
+              " batch.selected " + intArrayToRangesString(batch.selected, batch.size));
+        }
+      }
+    }
+  }
 
    /**
     * Generate the non matching outer join output results for one vectorized row batch.
@@ -412,72 +557,30 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
     *          A subset of the rows of the batch that are non matches.
     * @param noMatchSize
     *          Number of non matches in noMatchs.
-    * @param numSel
-    *          The current count of rows in the rebuilding of the selected array.
-    *
-    * @return
-    *          The new count of selected rows.
     */
-   protected int generateOuterHashMapNoMatchResults(VectorizedRowBatch batch, int[] noMatchs,
-       int noMatchSize, int numSel) throws IOException, HiveException {
-     int[] selected = batch.selected;
-
-     // Generate result within big table batch with null small table results, using isRepeated
-     // if possible.
+   protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs,
+       int noMatchSize) throws IOException, HiveException {
 
-     if (numSel == 0) {
+     // Set null information in the small table results area.
 
-       // There were 0 matching rows -- so we can use the isRepeated optimization for the non
-       // matching rows.
+     for (int i = 0; i < noMatchSize; i++) {
+       int batchIndex = noMatchs[i];
 
        // Mark any scratch small table scratch columns that would normally receive a copy of the
-       // key as null and repeating.
+       // key as null, too.
        for (int column : bigTableOuterKeyOutputVectorColumns) {
          ColumnVector colVector = batch.cols[column];
-         colVector.isRepeating = true;
          colVector.noNulls = false;
-         colVector.isNull[0] = true;
+         colVector.isNull[batchIndex] = true;
        }
 
-       // Small table values are set to null and repeating.
+       // Small table values are set to null.
        for (int column : smallTableOutputVectorColumns) {
          ColumnVector colVector = batch.cols[column];
-         colVector.isRepeating = true;
          colVector.noNulls = false;
-         colVector.isNull[0] = true;
-       }
-
-       // Rebuild the selected array.
-       for (int i = 0; i < noMatchSize; i++) {
-         int batchIndex = noMatchs[i];
-         selected[numSel++] = batchIndex;
-       }
-     } else {
-
-       // Set null information in the small table results area.
-
-       for (int i = 0; i < noMatchSize; i++) {
-         int batchIndex = noMatchs[i];
-
-         // Mark any scratch small table scratch columns that would normally receive a copy of the
-         // key as null, too.
-         for (int column : bigTableOuterKeyOutputVectorColumns) {
-           ColumnVector colVector = batch.cols[column];
-           colVector.noNulls = false;
-           colVector.isNull[batchIndex] = true;
-         }
-
-         // Small table values are set to null.
-         for (int column : smallTableOutputVectorColumns) {
-           ColumnVector colVector = batch.cols[column];
-           colVector.noNulls = false;
-           colVector.isNull[batchIndex] = true;
-         }
-
-         selected[numSel++] = batchIndex;
+         colVector.isNull[batchIndex] = true;
        }
      }
-     return numSel;
    }
 
   /**
@@ -492,65 +595,114 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
    *          The hash map lookup result for the repeated key.
    * @param hashMapResults
    *          The array of all hash map results for the batch.
+   * @param someRowsFilteredOut
+   *          Whether some rows of the repeated key batch were knocked out by the filter.
+   * @param inputSelectedInUse
+   *          A copy of the batch's selectedInUse flag on input to the process method.
+   * @param inputLogicalSize
+   *          The batch's size on input to the process method.
    * @param scratch1
    *          Pre-allocated storage to internal use.
+   * @param scratch2
+   *          Pre-allocated storage to internal use.
    */
-  public int finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
-      VectorMapJoinHashMapResult hashMapResult, int[] scratch1)
+  public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult,
+      VectorMapJoinHashMapResult hashMapResult, boolean someRowsFilteredOut,
+      boolean inputSelectedInUse, int inputLogicalSize)
           throws IOException, HiveException {
 
-    int numSel = 0;
+    // LOG.debug("finishOuterRepeated batch #" + batchCounter + " " + joinResult.name() + " batch.size " + batch.size + " someRowsFilteredOut " + someRowsFilteredOut);
 
-    if (joinResult == JoinUtil.JoinResult.MATCH && bigTableFilterExpressions.length > 0) {
+    switch (joinResult) {
+    case MATCH:
 
-      // Since it is repeated, the evaluation of the filter will knock the whole batch out.
-      // But since we are doing outer join, we want to keep non-matches.
+      // Rows we looked up as one repeated key are a match.  But filtered out rows
+      // need to be generated as non-matches, too.
 
-      // First, remember selected;
-      int[] rememberSelected = scratch1;
-      int rememberBatchSize = batch.size;
-      if (batch.selectedInUse) {
-        System.arraycopy(batch.selected, 0, rememberSelected, 0, batch.size);
-      }
+      if (someRowsFilteredOut) {
 
-      // Filter.
-      for (VectorExpression ve : bigTableFilterExpressions) {
-        ve.evaluate(batch);
-      }
+        // For the filtered out rows that didn't (logically) get looked up in the hash table,
+        // we need to generate no match results for those too...
 
-      // Convert a filter out to a non match.
-      if (batch.size == 0) {
-        joinResult = JoinUtil.JoinResult.NOMATCH;
-        if (batch.selectedInUse) {
-          System.arraycopy(rememberSelected, 0, batch.selected, 0, rememberBatchSize);
-          // LOG.info("finishOuterRepeated batch #" + batchCounter + " filter out converted to no matchs " +
-          //     Arrays.toString(Arrays.copyOfRange(batch.selected, 0, rememberBatchSize)));
-        } else {
-          // LOG.info("finishOuterRepeated batch #" + batchCounter + " filter out converted to no matchs batch size " +
-          //     rememberBatchSize);
-        }
-        batch.size = rememberBatchSize;
-      }
-    }
+        // Run value expressions over original (whole) input batch.
+        doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize);
 
-    // LOG.info("finishOuterRepeated batch #" + batchCounter + " " + joinResult.name() + " batch.size " + batch.size);
-    switch (joinResult) {
-    case MATCH:
-      // Run our value expressions over whole batch.
-      if (bigTableValueExpressions != null) {
-        for(VectorExpression ve: bigTableValueExpressions) {
-          ve.evaluate(batch);
+        // Now calculate which rows were filtered out (they are logically no matches).
+
+        // Determine which rows are non matches by determining the delta between inputSelected and
+        // (current) batch selected.
+
+        int noMatchCount = subtractFromInputSelected(
+                inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs);
+
+        generateOuterNulls(batch, noMatchs, noMatchCount);
+
+        // Now generate the matchs.  Single small table values will be put into the big table
+        // batch and come back in matchs.  Any multiple small table value results will go into
+        // the overflow batch.
+        generateHashMapResultRepeatedAll(batch, hashMapResult);
+
+        // Merge noMatchs and (match) selected.
+        int mergeCount = sortMerge(
+                noMatchs, noMatchCount, batch.selected, batch.size, merged);
+
+        System.arraycopy(merged, 0, batch.selected, 0, mergeCount);
+        batch.size = mergeCount;
+        batch.selectedInUse = true;
+      } else {
+
+        // Just run our value expressions over input batch.
+
+        if (bigTableValueExpressions != null) {
+          for(VectorExpression ve: bigTableValueExpressions) {
+            ve.evaluate(batch);
+          }
         }
-      }
 
-      // Use a common method applicable for inner and outer.
-      numSel = generateHashMapResultRepeatedAll(batch, hashMapResult);
+        generateHashMapResultRepeatedAll(batch, hashMapResult);
+      }
       break;
+
     case SPILL:
-      // Whole batch is spilled.
+
+      // Rows we looked up as one repeated key need to spill.  But filtered out rows
+      // need to be generated as non-matches, too.
+
       spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMapResult);
+
+      // After using selected to generate spills, generate non-matches, if any.
+      if (someRowsFilteredOut) {
+
+        // Determine which rows are non matches by determining the delta between inputSelected and
+        // (current) batch selected.
+
+        int noMatchCount = subtractFromInputSelected(
+                inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs);
+
+        System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount);
+        batch.size = noMatchCount;
+        batch.selectedInUse = true;
+
+        generateOuterNullsRepeatedAll(batch);
+      } else {
+        batch.size = 0;
+      }
+
       break;
+
     case NOMATCH:
+
+      if (someRowsFilteredOut) {
+
+        // When the repeated no match is due to filtering, we need to restore the
+        // selected information.
+
+        if (inputSelectedInUse) {
+          System.arraycopy(inputSelected, 0, batch.selected, 0, inputLogicalSize);
+        }
+        batch.size = inputLogicalSize;
+      }
+
       // Run our value expressions over whole batch.
       if (bigTableValueExpressions != null) {
         for(VectorExpression ve: bigTableValueExpressions) {
@@ -558,11 +710,9 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
         }
       }
 
-      numSel = generateOuterNullsRepeatedAll(batch);
+      generateOuterNullsRepeatedAll(batch);
       break;
     }
-
-    return numSel;
   }
 
   /**
@@ -573,24 +723,8 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
    *
    * @param batch
    *          The big table batch.
-   * @return
-   *          The new count of selected rows.
    */
-  protected int generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException {
-
-    int[] selected = batch.selected;
-    boolean selectedInUse = batch.selectedInUse;
-
-    // Generate result within big table batch using is repeated for null small table results.
-
-    if (batch.selectedInUse) {
-      // The selected array is already filled in as we want it.
-    } else {
-      for (int i = 0; i < batch.size; i++) {
-        selected[i] = i;
-      }
-      batch.selectedInUse = true;
-    }
+  protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException {
 
     for (int column : smallTableOutputVectorColumns) {
       ColumnVector colVector = batch.cols[column];
@@ -607,12 +741,5 @@ public abstract class VectorMapJoinOuterGenerateResultOperator
       colVector.isNull[0] = true;
       colVector.isRepeating = true;
     }
-
-    // for (int i = 0; i < batch.size; i++) {
-    //   int bigTableIndex = selected[i];
-    //   VectorizedBatchUtil.debugDisplayOneRow(batch, bigTableIndex, taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator generate generateOuterNullsRepeatedAll batch");
-    // }
-
-    return batch.size;
   }
 }
\ No newline at end of file


Mime
View raw message