drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-6323) Lateral Join - Initial implementation
Date Tue, 17 Apr 2018 18:06:00 GMT

    [ https://issues.apache.org/jira/browse/DRILL-6323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441277#comment-16441277
] 

ASF GitHub Bot commented on DRILL-6323:
---------------------------------------

Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1212#discussion_r182172836
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
---
    @@ -0,0 +1,861 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.base.LateralContract;
    +import org.apache.drill.exec.physical.config.LateralJoinPOP;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.JoinBatchMemoryManager;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.RecordBatchSizer;
    +import org.apache.drill.exec.record.VectorAccessibleUtilities;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
    +
    +/**
    + * RecordBatch implementation for the lateral join operator. Currently it's expected
LATERAL to co-exists with UNNEST
    + * operator. Both LATERAL and UNNEST will share a contract with each other defined at
{@link LateralContract}
    + */
    +public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
implements LateralContract {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
    +
    +  // Input indexes to correctly update the stats
    +  private static final int LEFT_INPUT = 0;
    +
    +  private static final int RIGHT_INPUT = 1;
    +
    +  // Maximum number records in the outgoing batch
    +  private int maxOutputRowCount;
    +
    +  // Schema on the left side
    +  private BatchSchema leftSchema;
    +
    +  // Schema on the right side
    +  private BatchSchema rightSchema;
    +
    +  // Index in output batch to populate next row
    +  private int outputIndex;
    +
    +  // Current index of record in left incoming which is being processed
    +  private int leftJoinIndex = -1;
    +
    +  // Current index of record in right incoming which is being processed
    +  private int rightJoinIndex = -1;
    +
    +  // flag to keep track if current left batch needs to be processed in future next call
    +  private boolean processLeftBatchInFuture;
    +
    +  // Keep track if any matching right record was found for current left index record
    +  private boolean matchedRecordFound;
    +
    +  private boolean useMemoryManager = true;
    +
    +  /* ****************************************************************************************************************
    +   * Public Methods
    +   * ****************************************************************************************************************/
    +  public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
    +                          RecordBatch left, RecordBatch right) throws OutOfMemoryException
{
    +    super(popConfig, context, left, right);
    +    Preconditions.checkNotNull(left);
    +    Preconditions.checkNotNull(right);
    +    final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
    +    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
    +
    +    // Initially it's set to default value of 64K and later for each new output row it
will be set to the computed
    +    // row count
    +    maxOutputRowCount = batchMemoryManager.getOutputRowCount();
    +  }
    +
    +  /**
    +   * Method that get's left and right incoming batch and produce the output batch. If
the left incoming batch is
    +   * empty then next on right branch is not called and empty batch with correct outcome
is returned. If non empty
    +   * left incoming batch is received then it call's next on right branch to get an incoming
and finally produces
    +   * output.
    +   * @return IterOutcome state of the lateral join batch
    +   */
    +  @Override
    +  public IterOutcome innerNext() {
    +
    +    // We don't do anything special on FIRST state. Process left batch first and then
right batch if need be
    +    IterOutcome childOutcome = processLeftBatch();
    +
    +    // reset this state after calling processLeftBatch above.
    +    processLeftBatchInFuture = false;
    +
    +    // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT)
or the state returned
    +    // from left side is terminal state then just return the IterOutcome and don't call
next() on right branch
    +    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
    +      container.setRecordCount(0);
    +      return childOutcome;
    +    }
    +
    +    // Left side has some records in the batch so let's process right batch
    +    childOutcome = processRightBatch();
    +
    +    // reset the left & right outcomes to OK here and send the empty batch downstream
    +    // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which
is what UNNEST will do
    +    if (childOutcome == OK_NEW_SCHEMA) {
    +      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
    +      rightUpstream = OK;
    +      return childOutcome;
    +    }
    +
    +    if (isTerminalOutcome(childOutcome)) {
    +      return childOutcome;
    +    }
    +
    +    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch,
then we should setup schema in
    +    // output container based on new left schema and old right schema. If schema change
failed then return STOP
    +    // downstream
    +    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
    +      return STOP;
    +    }
    +
    +    // Setup the references of left, right and outgoing container in generated operator
    +    state = BatchState.NOT_FIRST;
    +
    +    // Update the memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // allocate space for the outgoing batch
    +    allocateVectors();
    +
    +    return produceOutputBatch();
    +  }
    +
    +  @Override
    +  public void close() {
    +    updateBatchMemoryManagerStats();
    +    super.close();
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    return container.getRecordCount();
    +  }
    +
    +  /**
    +   * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator
of Lateral
    +   * to process the records at leftJoinIndex.
    +   *
    +   * @return - RecordBatch received as left side incoming
    +   */
    +  @Override
    +  public RecordBatch getIncoming() {
    +    Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected
since right side will only be " +
    +      "called iff there is any valid left batch");
    +    return left;
    +  }
    +
    +  /**
    +   * Returns the current row index which the calling operator should process in current
incoming left record batch.
    +   * LATERAL should never return it as -1 since that indicated current left batch is
empty and LATERAL will never
    +   * call next on right side with empty left batch
    +   *
    +   * @return - int - index of row to process.
    +   */
    +  @Override
    +  public int getRecordIndex() {
    +    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
    +      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount()));
    +    return leftJoinIndex;
    +  }
    +
    +  /**
    +   * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}
for the left incoming batch
    +   */
    +  @Override
    +  public IterOutcome getLeftOutcome() {
    +    return leftUpstream;
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Protected Methods
    +   * ****************************************************************************************************************/
    +
    +  /**
    +   * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}.
If left batch sees a
    +   * failure outcome then we don't even call next on right branch, since there is no
left incoming.
    +   * @return true if both the left/right batch was received without failure outcome.
    +   *         false if either of batch is received with failure outcome.
    +   */
    +  @Override
    +  protected boolean prefetchFirstBatchFromBothSides() {
    +    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first
batch
    +    leftUpstream = next(0, left);
    +
    +    boolean validBatch = setBatchState(leftUpstream);
    +
    +    if (validBatch) {
    +      rightUpstream = next(1, right);
    +      validBatch = setBatchState(rightUpstream);
    +    }
    +
    +    // EMIT outcome is not expected as part of first batch from either side
    +    if (leftUpstream == EMIT || rightUpstream == EMIT) {
    +      state = BatchState.STOP;
    +      throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from
left or right side in " +
    +        "buildSchema phase");
    +    }
    +    return validBatch;
    +  }
    +
    +  /**
    +   * Prefetch a batch from left and right branch to know about the schema of each side.
Then adds value vector in
    +   * output container based on those schemas. For this phase LATERAL always expect's
an empty batch from right side
    +   * which UNNEST should abide by.
    +   *
    +   * @throws SchemaChangeException if batch schema was changed during execution
    +   */
    +  @Override
    +  protected void buildSchema() throws SchemaChangeException {
    +    // Prefetch a RecordBatch from both left and right branch
    +    if (!prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
    +    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first
right batch received");
    +
    +    // Update the record memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // Setup output container schema based on known left and right schema
    +    setupNewSchema();
    +
    +    // Release the vectors received from right side
    +    VectorAccessibleUtilities.clear(right);
    +
    +    // Set join index as invalid (-1) if the left side is empty, else set it to 0
    +    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
    +    rightJoinIndex = -1;
    +
    +    // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will
be returned correctly
    +    // by buildSchema caller and we should treat the batch as received with OK outcome.
    +    leftUpstream = OK;
    +    rightUpstream = OK;
    +  }
    +
    +  @Override
    +  protected void killIncoming(boolean sendUpstream) {
    +    this.left.kill(sendUpstream);
    +    // Reset the left side outcome as STOP since as part of right kill when UNNEST will
ask IterOutcome of left incoming
    +    // from LATERAL and based on that it can make decision if the kill is coming from
downstream to LATERAL or upstream
    +    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream
to LATERAL and downstream to
    +    // UNNEST.
    +    leftUpstream = STOP;
    +    this.right.kill(sendUpstream);
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Private Methods
    +   * ****************************************************************************************************************/
    +
    +  private boolean handleSchemaChange() {
    +    try {
    +      stats.startSetup();
    +      logger.debug(String.format("Setting up new schema based on incoming batch. Old
output schema: %s",
    +        container.getSchema()));
    +      setupNewSchema();
    +      return true;
    +    } catch (SchemaChangeException ex) {
    +      logger.error("Failed to handle schema change hence killing the query");
    +      context.getExecutorState().fail(ex);
    +      left.kill(true); // Can have exchange receivers on left so called with true
    +      right.kill(false); // Won't have exchange receivers on right side
    +      return false;
    +    } finally {
    +      stats.stopSetup();
    +    }
    +  }
    +
    +  private boolean isTerminalOutcome(IterOutcome outcome) {
    +    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
    +  }
    +
    +  /**
    +   * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call
from upstream operator. Also
    +   * when we populate the outgoing container then this method is called to get next left
batch if current one is
    +   * fully processed. It calls next() on left side until we get a non-empty RecordBatch.
OR we get either of
    +   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processLeftBatch() {
    +
    +    boolean needLeftBatch = leftJoinIndex == -1;
    +
    +    // If left batch is empty
    +    while (needLeftBatch) {
    +      leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;
    +      final boolean emptyLeftBatch = left.getRecordCount() <=0;
    +      logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
    +
    +      switch (leftUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // This OK_NEW_SCHEMA is received post build schema phase and from left side
    +          // If schema didn't actually changed then just handle it as OK outcome. This
is fine since it is not setting
    +          // up any incoming vector references in setupNewSchema. While copying the records
it always work on latest
    +          // incoming vector.
    +          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
    +            logger.warn(String.format("New schema received from left side is same as
previous known left schema. " +
    +              "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s",
leftSchema, left.getSchema()));
    +
    +            // Current left batch is empty and schema didn't changed as well, so let's
get next batch and loose
    +            // OK_NEW_SCHEMA outcome
    +            processLeftBatchInFuture = false;
    +            if (emptyLeftBatch) {
    +              continue;
    +            } else {
    +              leftUpstream = OK;
    +            }
    +          } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            // This means there is already some records from previous join inside left
batch
    +            // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in
subsequent next call
    +            processLeftBatchInFuture = true;
    +            return OK_NEW_SCHEMA;
    +          }
    +
    +          // If left batch is empty with actual schema change then just rebuild the output
container and send empty
    +          // batch downstream
    +          if (emptyLeftBatch) {
    +            if (handleSchemaChange()) {
    +              leftJoinIndex = -1;
    +              return OK_NEW_SCHEMA;
    +            } else {
    +              return STOP;
    +            }
    +          } // else - setup the new schema information after getting it from right side
too.
    +        case OK:
    +          // With OK outcome we will keep calling next until we get a batch with >0
records
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            continue;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case EMIT:
    +          // don't call next on right batch
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            return EMIT;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          // Not using =0 since if outgoing container is empty then no point returning
anything
    +          if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            processLeftBatchInFuture = true;
    +          }
    +          return leftUpstream;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(5);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch
of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +      needLeftBatch = leftJoinIndex == -1;
    +    }
    +    return leftUpstream;
    +  }
    +
    +  /**
    +   * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call
from upstream operator and if
    +   * left batch has some records in it. Also when we populate the outgoing container
then this method is called to
    +   * get next right batch if current one is fully processed.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processRightBatch() {
    +    // Check if we still have records left to process in left incoming from new batch
or previously half processed
    +    // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex
correctly. Like for new
    +    // batch leftJoinIndex will always be set to zero and once leftSide batch is fully
processed then it will be set
    +    // to -1.
    +    // Whereas rightJoinIndex is to keep track of record in right batch being joined
with record in left batch.
    +    // So when there are cases such that all records in right batch is not consumed by
the output, then rightJoinIndex
    +    // will be a valid index. When all records are consumed it will be set to -1.
    +    boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex ==
-1);
    +    while (needNewRightBatch) {
    +      rightUpstream = next(RIGHT_INPUT, right);
    +      switch (rightUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // We should not get OK_NEW_SCHEMA multiple times for the same left incoming
batch. So there won't be a
    +          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA
--> OK/EMIT fall through
    +          //
    +          // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so
let's pass the new schema
    +          // downstream and later with subsequent next() call the join output will be
produced
    +          Preconditions.checkState(right.getRecordCount() == 0,
    +            "Right side batch with OK_NEW_SCHEMA is not empty");
    +
    +          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
    +            logger.warn(String.format("New schema received from right side is same as
previous known right schema. " +
    +              "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
    +              rightSchema, right.getSchema()));
    +            continue;
    +          }
    +          if (handleSchemaChange()) {
    +            container.setRecordCount(0);
    +            rightJoinIndex = -1;
    +            return OK_NEW_SCHEMA;
    +          } else {
    +            return STOP;
    +          }
    +        case OK:
    +        case EMIT:
    +          // Even if there are no records we should not call next() again because in
case of LEFT join empty batch is
    +          // of importance too
    +          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
    +          needNewRightBatch = false;
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          needNewRightBatch = false;
    +          break;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(10);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch
of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +    }
    +    return rightUpstream;
    +  }
    +
    +  /**
    +   * Get's the current left and right incoming batch and does the cross join to fill
the output batch. If all the
    +   * records in the either or both the batches are consumed then it get's next batch
from that branch depending upon
    +   * if output batch still has some space left. If output batch is full then the output
if finalized to be sent
    +   * downstream. Subsequent call's knows how to consume previously half consumed (if
any) batches and producing the
    +   * output using that.
    +   *
    +   * @return - IterOutcome to be send along with output batch to downstream operator
    +   */
    +  private IterOutcome produceOutputBatch() {
    +
    +    boolean isLeftProcessed = false;
    +
    +    // Try to fully pack the outgoing container
    +    while (!isOutgoingBatchFull()) {
    +      final int previousOutputCount = outputIndex;
    +      // invoke the runtime generated method to emit records in the output batch for
each leftJoinIndex
    +      crossJoinAndOutputRecords();
    +
    +      // We have produced some records in outgoing container, hence there must be a match
found for left record
    +      if (outputIndex > previousOutputCount) {
    +        // Need this extra flag since there can be left join case where for current leftJoinIndex
it receives a right
    +        // batch with data, then an empty batch and again another empty batch with EMIT
outcome. If we just use
    +        // outputIndex then we will loose the information that few rows for leftJoinIndex
is already produced using
    +        // first right batch
    +        matchedRecordFound = true;
    +      }
    +
    +      // One right batch might span across multiple output batch. So rightIndex will
be moving sum of all the
    +      // output records for this record batch until it's fully consumed.
    +      //
    +      // Also it can be so that one output batch can contain records from 2 different
right batch hence the
    +      // rightJoinIndex should move by number of records in output batch for current
right batch only.
    +      rightJoinIndex += outputIndex - previousOutputCount;
    +      final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount();
    +
    +      // Check if above join to produce output was based on empty right batch or
    +      // it resulted in right side batch to be fully consumed. In this scenario only
if rightUpstream
    +      // is EMIT then increase the leftJoinIndex.
    +      // Otherwise it means for the given right batch there is still some record left
to be processed.
    +      if (isRightProcessed) {
    +        if (rightUpstream == EMIT) {
    +          if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType())
{
    +            // copy left side in case of LEFT join
    +            emitLeft(leftJoinIndex, outputIndex++);
    +          }
    +          ++leftJoinIndex;
    +          // Reset matchedRecord for next left index record
    +          matchedRecordFound = false;
    +        }
    +
    +        // Release vectors of right batch. This will happen for both rightUpstream =
EMIT/OK
    +        VectorAccessibleUtilities.clear(right);
    +        rightJoinIndex = -1;
    +      }
    +
    +      // Check if previous left record was last one, then set leftJoinIndex to -1
    +      isLeftProcessed = leftJoinIndex >= left.getRecordCount();
    +      if (isLeftProcessed) {
    +        leftJoinIndex = -1;
    +        VectorAccessibleUtilities.clear(left);
    +      }
    +
    +      // Check if output batch still has some space
    +      if (!isOutgoingBatchFull()) {
    +        // Check if left side still has records or not
    +        if (isLeftProcessed) {
    +          // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return
output to downstream layer before
    +          // getting next batch
    +          if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
    +            break;
    +          } else {
    +            logger.debug("Output batch still has some space left, getting new batches
from left and right");
    +            // Get both left batch and the right batch and make sure indexes are properly
set
    +            leftUpstream = processLeftBatch();
    +
    +            if (processLeftBatchInFuture) {
    +              logger.debug("Received left batch with outcome {} such that we have to
return the current outgoing " +
    +                "batch and process the new batch in subsequent next call", leftUpstream);
    +              // We should return the current output batch with OK outcome and don't
reset the leftUpstream
    +              finalizeOutputContainer();
    +              return OK;
    +            }
    +
    +            // If left batch received a terminal outcome then don't call right batch
    +            if (isTerminalOutcome(leftUpstream)) {
    +              finalizeOutputContainer();
    +              return leftUpstream;
    +            }
    +
    +            // If we have received the left batch with EMIT outcome and is empty then
we should return previous output
    +            // batch with EMIT outcome
    +            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
    +              isLeftProcessed = true;
    +              break;
    +            }
    +
    +            // Update the batch memory manager to use new left incoming batch
    +            updateMemoryManager(LEFT_INPUT);
    +          }
    +        }
    +
    +        // If we are here it means one of the below:
    +        // 1) Either previous left batch was not fully processed and it came with OK
outcome. There is still some space
    +        // left in outgoing batch so let's get next right batch.
    +        // 2) OR previous left & right batch was fully processed and it came with
OK outcome. There is space in outgoing
    +        // batch. Now we have got new left batch with OK outcome. Let's get next right
batch
    +        //
    +        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
    +        rightUpstream = processRightBatch();
    +        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change
in right branch");
    +
    +        if (isTerminalOutcome(rightUpstream)) {
    +          finalizeOutputContainer();
    +          return rightUpstream;
    +        }
    +
    +        // Update the batch memory manager to use new right incoming batch
    +        updateMemoryManager(RIGHT_INPUT);
    +      }
    +    } // output batch is full to its max capacity
    +
    +    finalizeOutputContainer();
    +
    +    // Check if output batch was full and left was fully consumed or not. Since if left
is not consumed entirely
    +    // but output batch is full, then if the left batch came with EMIT outcome we should
send this output batch along
    +    // with OK outcome not with EMIT. Whereas if output is full and left is also fully
consumed then we should send
    +    // EMIT outcome.
    +    if (leftUpstream == EMIT && isLeftProcessed) {
    +      logger.debug("Sending current output batch with EMIT outcome since left is received
with EMIT and is fully " +
    +        "consumed in output batch");
    +      return EMIT;
    +    }
    +
    +    if (leftUpstream == OK_NEW_SCHEMA) {
    +      // return output batch with OK_NEW_SCHEMA and reset the state to OK
    +      logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the
left outcome to OK for next set" +
    +        " of batches");
    +      leftUpstream = OK;
    +      return OK_NEW_SCHEMA;
    +    }
    +    return OK;
    +  }
    +
    +  /**
    +   * Finalizes the current output container with the records produced so far before sending
it downstream
    +   */
    +  private void finalizeOutputContainer() {
    +    VectorAccessibleUtilities.setValueCount(container, outputIndex);
    +
    +    // Set the record count in the container
    +    container.setRecordCount(outputIndex);
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    batchMemoryManager.updateOutgoingStats(outputIndex);
    +    logger.debug("Number of records emitted: " + outputIndex);
    +
    +    // Update the output index for next output batch to zero
    +    outputIndex = 0;
    +  }
    +
    +  /**
    +   * Check if the schema changed between provided newSchema and oldSchema. It relies
on
    +   * {@link BatchSchema#isEquivalent(BatchSchema)}.
    +   * @param newSchema - New Schema information
    +   * @param oldSchema -  - New Schema information to compare with
    +   *
    +   * @return - true - if newSchema is not same as oldSchema
    +   *         - false - if newSchema is same as oldSchema
    +   */
    +  private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
    +    return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema);
    +  }
    +
    +  /**
    +   * Validate if the input schema is not null and doesn't contain any Selection Vector.
    +   * @param schema - input schema to verify
    +   * @return - true: valid input schema
    +   *           false: invalid input schema
    +   */
    +  private boolean verifyInputSchema(BatchSchema schema) {
    +
    +    boolean isValid = true;
    +    if (schema == null) {
    +      logger.error("Null schema found for the incoming batch");
    +      isValid = false;
    +    } else {
    +      final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode();
    +      if (svMode != BatchSchema.SelectionVectorMode.NONE) {
    +        logger.error("Incoming batch schema found with selection vector which is not
supported. SVMode: {}",
    +          svMode.toString());
    +        isValid = false;
    +      }
    +    }
    +    return isValid;
    +  }
    +
    +  /**
    +   * Helps to create the outgoing container vectors based on known left and right batch
schemas
    +   * @throws SchemaChangeException
    +   */
    +  private void setupNewSchema() throws SchemaChangeException {
    +
    +    logger.debug(String.format("Setting up new schema based on incoming batch. New left
schema: %s" +
    +        " and New right schema: %s", left.getSchema(), right.getSchema()));
    +
    +    // Clear up the container
    +    container.clear();
    +    leftSchema = left.getSchema();
    +    rightSchema = right.getSchema();
    +
    +    if (!verifyInputSchema(leftSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for left incoming batch");
    +    }
    +
    +    if (!verifyInputSchema(rightSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for right incoming batch");
    +    }
    +
    --- End diff --
    
    Should we add a check here to guard that there is no duplication of names between right
and left inputs?


> Lateral Join - Initial implementation
> -------------------------------------
>
>                 Key: DRILL-6323
>                 URL: https://issues.apache.org/jira/browse/DRILL-6323
>             Project: Apache Drill
>          Issue Type: Task
>            Reporter: Parth Chandra
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>
> Implementation of Lateral Join with unit tests using MockRecordBatch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message