drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [2/5] drill git commit: DRILL-1991: Code indendation and formatting cleanup for few files
Date Fri, 16 Jan 2015 23:14:56 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/10fd9e10/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5deb67f..f370dc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import java.io.IOException;
-import java.util.List;
-
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
@@ -49,7 +49,6 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -57,118 +56,105 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.eigenbase.rel.JoinRelType;
 
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+import java.util.List;
 
 public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+  // Probe side record batch
+  private final RecordBatch left;
 
-  public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
-    // Probe side record batch
-    private final RecordBatch left;
-
-    // Build side record batch
-    private final RecordBatch right;
+  // Build side record batch
+  private final RecordBatch right;
 
-    // Join type, INNER, LEFT, RIGHT or OUTER
-    private final JoinRelType joinType;
+  // Join type, INNER, LEFT, RIGHT or OUTER
+  private final JoinRelType joinType;
 
-    // Join conditions
-    private final List<JoinCondition> conditions;
+  // Join conditions
+  private final List<JoinCondition> conditions;
 
-    // Runtime generated class implementing HashJoinProbe interface
-    private HashJoinProbe hashJoinProbe = null;
+  // Runtime generated class implementing HashJoinProbe interface
+  private HashJoinProbe hashJoinProbe = null;
 
-    /* Helper class
-     * Maintains linked list of build side records with the same key
-     * Keeps information about which build records have a corresponding
-     * matching key in the probe side (for outer, right joins)
-     */
-    private HashJoinHelper hjHelper = null;
+  /* Helper class
+   * Maintains linked list of build side records with the same key
+   * Keeps information about which build records have a corresponding
+   * matching key in the probe side (for outer, right joins)
+   */
+  private HashJoinHelper hjHelper = null;
 
-    // Underlying hashtable used by the hash join
-    private HashTable hashTable = null;
+  // Underlying hashtable used by the hash join
+  private HashTable hashTable = null;
 
-    /* Hyper container to store all build side record batches.
-     * Records are retrieved from this container when there is a matching record
-     * on the probe side
-     */
-    private ExpandableHyperContainer hyperContainer;
+  /* Hyper container to store all build side record batches.
+   * Records are retrieved from this container when there is a matching record
+   * on the probe side
+   */
+  private ExpandableHyperContainer hyperContainer;
 
-    // Number of records in the output container
-    private int outputRecords;
+  // Number of records in the output container
+  private int outputRecords;
 
-    // Current batch index on the build side
-    private int buildBatchIndex = 0;
+  // Current batch index on the build side
+  private int buildBatchIndex = 0;
 
-    // Schema of the build side
-    private BatchSchema rightSchema = null;
+  // Schema of the build side
+  private BatchSchema rightSchema = null;
 
 
+  // Generator mapping for the build side
+  // Generator mapping for the build side : scalar
+  private static final GeneratorMapping PROJECT_BUILD =
+      GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method
*/, null /* reset */,
+          null /* cleanup */);
+  // Generator mapping for the build side : constant
+  private static final GeneratorMapping PROJECT_BUILD_CONSTANT =
+      GeneratorMapping.create("doSetup"/* setup method */, "doSetup" /* eval method */, null
/* reset */,
+          null /* cleanup */);
 
-    // Generator mapping for the build side
-    // Generator mapping for the build side : scalar
-    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/*
setup method */,
-                                                                                  "projectBuildRecord"
/* eval method */,
-                                                                                  null /*
reset */, null /* cleanup */);
-    // Generator mapping for the build side : constant
-    private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/*
setup method */,
-                                                                                  "doSetup"
/* eval method */,
-                                                                                   null /*
reset */, null /* cleanup */);
+  // Generator mapping for the probe side : scalar
+  private static final GeneratorMapping PROJECT_PROBE =
+      GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" /* eval
method */, null /* reset */,
+          null /* cleanup */);
+  // Generator mapping for the probe side : constant
+  private static final GeneratorMapping PROJECT_PROBE_CONSTANT =
+      GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */,
null /* reset */,
+          null /* cleanup */);
 
-    // Generator mapping for the probe side : scalar
-    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup"
/* setup method */,
-                                                                                  "projectProbeRecord"
/* eval method */,
-                                                                                  null /*
reset */, null /* cleanup */);
-    // Generator mapping for the probe side : constant
-    private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup"
/* setup method */,
-                                                                                  "doSetup"
/* eval method */,
-                                                                                  null /*
reset */, null /* cleanup */);
 
+  // Mapping set for the build side
+  private final MappingSet projectBuildMapping =
+      new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */, "buildBatch"
/* read container */,
+          "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, PROJECT_BUILD);
 
-    // Mapping set for the build side
-    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index
*/, "outIndex" /* write index */,
-                                                                  "buildBatch" /* read container
*/,
-                                                                  "outgoing" /* write container
*/,
-                                                                  PROJECT_BUILD_CONSTANT,
PROJECT_BUILD);
+  // Mapping set for the probe side
+  private final MappingSet projectProbeMapping =
+      new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */, "probeBatch"
/* read container */,
+          "outgoing" /* write container */, PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
 
-    // Mapping set for the probe side
-    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index
*/, "outIndex" /* write index */,
-                                                                  "probeBatch" /* read container
*/,
-                                                                  "outgoing" /* write container
*/,
-                                                                  PROJECT_PROBE_CONSTANT,
PROJECT_PROBE);
+  IterOutcome leftUpstream = IterOutcome.NONE;
+  IterOutcome rightUpstream = IterOutcome.NONE;
 
-    // indicates if we have previously returned an output batch
-    boolean firstOutputBatch = true;
-
-    IterOutcome leftUpstream = IterOutcome.NONE;
-    IterOutcome rightUpstream = IterOutcome.NONE;
-
-    private final HashTableStats htStats = new HashTableStats();
+  private final HashTableStats htStats = new HashTableStats();
 
   public enum Metric implements MetricDef {
 
-      NUM_BUCKETS,
-      NUM_ENTRIES,
-      NUM_RESIZING,
-      RESIZING_TIME;
+    NUM_BUCKETS,
+    NUM_ENTRIES,
+    NUM_RESIZING,
+    RESIZING_TIME;
 
-      // duplicate for hash ag
-
-      @Override
-      public int metricId() {
-        return ordinal();
-      }
-    }
+    // duplicate for hash ag
 
     @Override
-    public int getRecordCount() {
-        return outputRecords;
+    public int metricId() {
+      return ordinal();
     }
+  }
 
-
+  @Override
+  public int getRecordCount() {
+    return outputRecords;
+  }
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
@@ -204,333 +190,332 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
     }
   }
 
-    @Override
-    public IterOutcome innerNext() {
-        try {
-            /* If we are here for the first time, execute the build phase of the
-             * hash join and setup the run time generated class for the probe side
-             */
-            if (state == BatchState.FIRST) {
-                // Build the hash table, using the build side record batches.
-                executeBuildPhase();
-//                IterOutcome next = next(HashJoinHelper.LEFT_INPUT, left);
-                hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, left.getRecordCount(),
this, hashTable, hjHelper, joinType);
-
-                // Update the hash table related stats for the operator
-                updateStats(this.hashTable);
-            }
+  @Override
+  public IterOutcome innerNext() {
+    try {
+      /* If we are here for the first time, execute the build phase of the
+       * hash join and setup the run time generated class for the probe side
+       */
+      if (state == BatchState.FIRST) {
+        // Build the hash table, using the build side record batches.
+        executeBuildPhase();
+        //                IterOutcome next = next(HashJoinHelper.LEFT_INPUT, left);
+        hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, left.getRecordCount(),
this, hashTable,
+            hjHelper, joinType);
+
+        // Update the hash table related stats for the operator
+        updateStats(this.hashTable);
+      }
 
-            // Store the number of records projected
-            if (hashTable != null
-                || joinType != JoinRelType.INNER) {
-
-                // Allocate the memory for the vectors in the output container
-                allocateVectors();
-
-                outputRecords = hashJoinProbe.probeAndProject();
-
-                /* We are here because of one the following
-                 * 1. Completed processing of all the records and we are done
-                 * 2. We've filled up the outgoing batch to the maximum and we need to return
upstream
-                 * Either case build the output container's schema and return
-                 */
-                if (outputRecords > 0 || state == BatchState.FIRST) {
-                  if (state == BatchState.FIRST) {
-                    state = BatchState.NOT_FIRST;
-                  }
-
-
-                  for (VectorWrapper<?> v : container) {
-                    v.getValueVector().getMutator().setValueCount(outputRecords);
-                  }
-
-                  return IterOutcome.OK;
-                }
-            } else {
-                // Our build side is empty, we won't have any matches, clear the probe side
-                if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK)
{
-                    for (VectorWrapper<?> wrapper : left) {
-                      wrapper.getValueVector().clear();
-                    }
-                    left.kill(true);
-                    leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
-                    while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK)
{
-                      for (VectorWrapper<?> wrapper : left) {
-                        wrapper.getValueVector().clear();
-                      }
-                      leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
-                    }
-                }
-            }
+      // Store the number of records projected
+      if (hashTable != null || joinType != JoinRelType.INNER) {
 
-            // No more output records, clean up and return
-            state = BatchState.DONE;
-//            if (first) {
-//              return IterOutcome.OK_NEW_SCHEMA;
-//            }
-            return IterOutcome.NONE;
-        } catch (ClassTransformationException | SchemaChangeException | IOException e) {
-            context.fail(e);
-            killIncoming(false);
-            return IterOutcome.STOP;
-        }
-    }
+        // Allocate the memory for the vectors in the output container
+        allocateVectors();
 
-    public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException
{
+        outputRecords = hashJoinProbe.probeAndProject();
 
-        // Setup the hash table configuration object
-        int conditionsSize = conditions.size();
+        /* We are here because of one the following
+         * 1. Completed processing of all the records and we are done
+         * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+         * Either case build the output container's schema and return
+         */
+        if (outputRecords > 0 || state == BatchState.FIRST) {
+          if (state == BatchState.FIRST) {
+            state = BatchState.NOT_FIRST;
+          }
 
-        NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
-        NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
 
-        // Create named expressions from the conditions
-        for (int i = 0; i < conditionsSize; i++) {
-            rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_"
+ i ));
-            leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_"
+ i));
+          for (VectorWrapper<?> v : container) {
+            v.getValueVector().getMutator().setValueCount(outputRecords);
+          }
 
-            // Hash join only supports equality currently.
-            assert conditions.get(i).getRelationship().equals("==");
+          return IterOutcome.OK;
         }
-
-        // Set the left named expression to be null if the probe batch is empty.
-        if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK)
{
-            leftExpr = null;
-        } else {
-          if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE)
{
-            throw new SchemaChangeException("Hash join does not support probe batch with
selection vectors");
+      } else {
+        // Our build side is empty, we won't have any matches, clear the probe side
+        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK)
{
+          for (VectorWrapper<?> wrapper : left) {
+            wrapper.getValueVector().clear();
+          }
+          left.kill(true);
+          leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
+          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK)
{
+            for (VectorWrapper<?> wrapper : left) {
+              wrapper.getValueVector().clear();
+            }
+            leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
           }
         }
+      }
+
+      // No more output records, clean up and return
+      state = BatchState.DONE;
+      //            if (first) {
+      //              return IterOutcome.OK_NEW_SCHEMA;
+      //            }
+      return IterOutcome.NONE;
+    } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+      context.fail(e);
+      killIncoming(false);
+      return IterOutcome.STOP;
+    }
+  }
+
+  public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException
{
+
+    // Setup the hash table configuration object
+    int conditionsSize = conditions.size();
 
-        HashTableConfig htConfig =
-            new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
+    NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+    NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+    // Create named expressions from the conditions
+    for (int i = 0; i < conditionsSize; i++) {
+      rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_"
+ i));
+      leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_"
+ i));
+
+      // Hash join only supports equality currently.
+      assert conditions.get(i).getRelationship().equals("==");
+    }
+
+    // Set the left named expression to be null if the probe batch is empty.
+    if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK)
{
+      leftExpr = null;
+    } else {
+      if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE)
{
+        throw new SchemaChangeException("Hash join does not support probe batch with selection
vectors");
+      }
+    }
+
+    HashTableConfig htConfig =
+        new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
             HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
-        // Create the chained hash table
-        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(),
-            this.right, this.left, null, false /* nulls are not equal */);
-        hashTable = ht.createAndSetupHashTable(null);
+    // Create the chained hash table
+    ChainedHashTable ht =
+        new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left,
null,
+            false /* nulls are not equal */);
+    hashTable = ht.createAndSetupHashTable(null);
+  }
+
+  public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException,
IOException {
+
+    //Setup the underlying hash table
+
+    // skip first batch if count is zero, as it may be an empty schema batch
+    if (right.getRecordCount() == 0) {
+      for (VectorWrapper w : right) {
+        w.clear();
+      }
+      rightUpstream = next(right);
     }
 
-    public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException,
IOException {
+    boolean moreData = true;
+
+    while (moreData) {
+
+      switch (rightUpstream) {
+
+      case NONE:
+      case NOT_YET:
+      case STOP:
+        moreData = false;
+        continue;
 
-        //Setup the underlying hash table
+      case OK_NEW_SCHEMA:
+        if (rightSchema == null) {
+          rightSchema = right.getSchema();
 
-      // skip first batch if count is zero, as it may be an empty schema batch
-        if (right.getRecordCount() == 0) {
-          for (VectorWrapper w : right) {
-            w.clear();
+          if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE)
{
+            throw new SchemaChangeException("Hash join does not support build batch with
selection vectors");
           }
-          rightUpstream = next(right);
+          setupHashTable();
+        } else {
+          if (!rightSchema.equals(right.getSchema())) {
+            throw new SchemaChangeException("Hash join does not support schema changes");
+          }
+          hashTable.updateBatches();
         }
+        // Fall through
+      case OK:
+        int currentRecordCount = right.getRecordCount();
 
-        boolean moreData = true;
-
-        while (moreData) {
-
-            switch (rightUpstream) {
-
-                case NONE:
-                case NOT_YET:
-                case STOP:
-                    moreData = false;
-                    continue;
-
-                case OK_NEW_SCHEMA:
-                    if (rightSchema == null) {
-                        rightSchema = right.getSchema();
-
-                        if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE)
{
-                          throw new SchemaChangeException("Hash join does not support build
batch with selection vectors");
-                        }
-                        setupHashTable();
-                    } else {
-                      if (!rightSchema.equals(right.getSchema())) {
-                        throw new SchemaChangeException("Hash join does not support schema
changes");
-                      }
-                      hashTable.updateBatches();
-                    }
-                // Fall through
-                case OK:
-                    int currentRecordCount = right.getRecordCount();
-
-                    /* For every new build batch, we store some state in the helper context
-                     * Add new state to the helper context
-                     */
-                    hjHelper.addNewBatch(currentRecordCount);
-
-                    // Holder contains the global index where the key is hashed into using
the hash table
-                    IndexPointer htIndex = new IndexPointer();
-
-                    // For every record in the build batch , hash the key columns
-                    for (int i = 0; i < currentRecordCount; i++) {
-
-                        HashTable.PutStatus status = hashTable.put(i, htIndex, 1 /* retry
count */);
-
-                        if (status != HashTable.PutStatus.PUT_FAILED) {
-                            /* Use the global index returned by the hash table, to store
-                             * the current record index and batch index. This will be used
-                             * later when we probe and find a match.
-                             */
-                            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
-                        }
-                    }
-
-                    /* Completed hashing all records in this batch. Transfer the batch
-                     * to the hyper vector container. Will be used when we want to retrieve
-                     * records that have matching keys on the probe side.
-                     */
-                    RecordBatchData nextBatch = new RecordBatchData(right);
-                    if (hyperContainer == null) {
-                        hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
-                    } else {
-                        hyperContainer.addBatch(nextBatch.getContainer());
-                    }
-
-                    // completed processing a batch, increment batch index
-                    buildBatchIndex++;
-                    break;
-            }
-            // Get the next record batch
-            rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
-        }
-    }
+        /* For every new build batch, we store some state in the helper context
+         * Add new state to the helper context
+         */
+        hjHelper.addNewBatch(currentRecordCount);
+
+        // Holder contains the global index where the key is hashed into using the hash table
+        IndexPointer htIndex = new IndexPointer();
 
-    public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException
{
+        // For every record in the build batch , hash the key columns
+        for (int i = 0; i < currentRecordCount; i++) {
 
+          HashTable.PutStatus status = hashTable.put(i, htIndex, 1 /* retry count */);
 
+          if (status != HashTable.PutStatus.PUT_FAILED) {
+            /* Use the global index returned by the hash table, to store
+             * the current record index and batch index. This will be used
+             * later when we probe and find a match.
+             */
+            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+          }
+        }
 
-        final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
-        ClassGenerator<HashJoinProbe> g = cg.getRoot();
+        /* Completed hashing all records in this batch. Transfer the batch
+         * to the hyper vector container. Will be used when we want to retrieve
+         * records that have matching keys on the probe side.
+         */
+        RecordBatchData nextBatch = new RecordBatchData(right);
+        if (hyperContainer == null) {
+          hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+        } else {
+          hyperContainer.addBatch(nextBatch.getContainer());
+        }
 
-        // Generate the code to project build side records
-        g.setMappingSet(projectBuildMapping);
+        // completed processing a batch, increment batch index
+        buildBatchIndex++;
+        break;
+      }
+      // Get the next record batch
+      rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
+    }
+  }
 
+  public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException
{
 
-        int fieldId = 0;
-        JExpression buildIndex = JExpr.direct("buildIndex");
-        JExpression outIndex = JExpr.direct("outIndex");
-        g.rotateBlock();
 
-        if (rightSchema != null) {
-            for(MaterializedField field : rightSchema) {
+    final CodeGenerator<HashJoinProbe> cg =
+        CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    ClassGenerator<HashJoinProbe> g = cg.getRoot();
 
-                MajorType inputType = field.getType();
-                MajorType outputType;
-                if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED)
{
-                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-                } else {
-                  outputType = inputType;
-                }
+    // Generate the code to project build side records
+    g.setMappingSet(projectBuildMapping);
 
-                // Add the vector to our output container
-//                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(),
outputType), context.getAllocator());
-                container.addOrGet(MaterializedField.create(field.getPath(), outputType));
 
-                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(),
true, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType,
false, fieldId));
-                g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
-                  .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-                  .arg(outIndex)
-                  .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+    int fieldId = 0;
+    JExpression buildIndex = JExpr.direct("buildIndex");
+    JExpression outIndex = JExpr.direct("outIndex");
+    g.rotateBlock();
 
-                fieldId++;
-            }
+    if (rightSchema != null) {
+      for (MaterializedField field : rightSchema) {
+
+        MajorType inputType = field.getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED)
{
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
         }
-        g.rotateBlock();
-        g.getEvalBlock()._return(JExpr.TRUE);
 
-        // Generate the code to project probe side records
-        g.setMappingSet(projectProbeMapping);
+        // Add the vector to our output container
+        //                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(),
+        // outputType), context.getAllocator());
+        container.addOrGet(MaterializedField.create(field.getPath(), outputType));
 
-        int outputFieldId = fieldId;
-        fieldId = 0;
-        JExpression probeIndex = JExpr.direct("probeIndex");
-        int recordCount = 0;
+        JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(),
true, fieldId));
+        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType,
false, fieldId));
+        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(buildIndex.band(JExpr.lit((int)
Character.MAX_VALUE)))
+            .arg(outIndex).arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
 
-        if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA)
{
-            for (VectorWrapper<?> vv : left) {
+        fieldId++;
+      }
+    }
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.TRUE);
 
-                MajorType inputType = vv.getField().getType();
-                MajorType outputType;
-                if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED)
{
-                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-                } else {
-                  outputType = inputType;
-                }
+    // Generate the code to project probe side records
+    g.setMappingSet(projectProbeMapping);
 
-                ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(),
outputType));
-                if (v instanceof AbstractContainerVector) {
-                  vv.getValueVector().makeTransferPair(v);
-                  v.clear();
-                }
+    int outputFieldId = fieldId;
+    fieldId = 0;
+    JExpression probeIndex = JExpr.direct("probeIndex");
 
-                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType,
false, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType,
false, outputFieldId));
+    if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      for (VectorWrapper<?> vv : left) {
 
-                g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+        MajorType inputType = vv.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED)
{
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
 
-                fieldId++;
-                outputFieldId++;
-            }
-            recordCount = left.getRecordCount();
+        ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getPath(),
outputType));
+        if (v instanceof AbstractContainerVector) {
+          vv.getValueVector().makeTransferPair(v);
+          v.clear();
         }
-        g.rotateBlock();
-        g.getEvalBlock()._return(JExpr.TRUE);
 
-        HashJoinProbe hj = context.getImplementationClass(cg);
+        JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType,
false, fieldId));
+        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType,
false, outputFieldId));
 
-        return hj;
-    }
+        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()
+            ._return(JExpr.FALSE);
 
-    private void allocateVectors() {
-      for(VectorWrapper<?> v : container) {
-        v.getValueVector().allocateNew();
+        fieldId++;
+        outputFieldId++;
       }
     }
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.TRUE);
 
-    public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left,
RecordBatch right) throws OutOfMemoryException {
-        super(popConfig, context, true);
-        this.left = left;
-        this.right = right;
-        this.joinType = popConfig.getJoinType();
-        this.conditions = popConfig.getConditions();
-    }
+    HashJoinProbe hj = context.getImplementationClass(cg);
 
-    private void updateStats(HashTable htable) {
-      if (htable == null) {
-        return;
-      }
-      htable.getStats(htStats);
-      this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
-      this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
-      this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-      this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+    return hj;
+  }
+
+  private void allocateVectors() {
+    for (VectorWrapper<?> v : container) {
+      v.getValueVector().allocateNew();
     }
+  }
 
-    @Override
-    public void killIncoming(boolean sendUpstream) {
-        this.left.kill(sendUpstream);
-        this.right.kill(sendUpstream);
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left,
+      RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context, true);
+    this.left = left;
+    this.right = right;
+    this.joinType = popConfig.getJoinType();
+    this.conditions = popConfig.getConditions();
+  }
+
+  private void updateStats(HashTable htable) {
+    if (htable == null) {
+      return;
     }
+    htable.getStats(htStats);
+    this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+    this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+    this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+    this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+  }
 
-    @Override
-    public void cleanup() {
-        if (hjHelper != null) {
-          hjHelper.clear();
-        }
+  @Override
+  public void killIncoming(boolean sendUpstream) {
+    this.left.kill(sendUpstream);
+    this.right.kill(sendUpstream);
+  }
 
-        // If we didn't receive any data, hyperContainer may be null, check before clearing
-        if (hyperContainer != null) {
-            hyperContainer.clear();
-        }
+  @Override
+  public void cleanup() {
+    if (hjHelper != null) {
+      hjHelper.clear();
+    }
 
-        if (hashTable != null) {
-            hashTable.clear();
-        }
-        super.cleanup();
-        right.cleanup();
-        left.cleanup();
+    // If we didn't receive any data, hyperContainer may be null, check before clearing
+    if (hyperContainer != null) {
+      hyperContainer.clear();
     }
 
+    if (hashTable != null) {
+      hashTable.clear();
+    }
+    super.cleanup();
+    right.cleanup();
+    left.cleanup();
+  }
 }


Mime
View raw message