drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Date Fri, 25 Aug 2017 23:32:56 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/906#discussion_r135369225
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
---
    @@ -39,88 +35,107 @@
     import org.apache.drill.exec.expr.ValueVectorWriteExpression;
     import org.apache.drill.exec.ops.FragmentContext;
     import org.apache.drill.exec.physical.config.UnionAll;
    -import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
     import org.apache.drill.exec.record.BatchSchema;
     import org.apache.drill.exec.record.MaterializedField;
     import org.apache.drill.exec.record.RecordBatch;
     import org.apache.drill.exec.record.TransferPair;
     import org.apache.drill.exec.record.TypedFieldId;
     import org.apache.drill.exec.record.VectorWrapper;
    -import org.apache.drill.exec.record.WritableBatch;
    -import org.apache.drill.exec.record.selection.SelectionVector2;
    -import org.apache.drill.exec.record.selection.SelectionVector4;
     import org.apache.drill.exec.resolver.TypeCastRules;
     import org.apache.drill.exec.vector.AllocationHelper;
     import org.apache.drill.exec.vector.FixedWidthVector;
     import org.apache.drill.exec.vector.SchemaChangeCallBack;
     import org.apache.drill.exec.vector.ValueVector;
     
    -import com.google.common.collect.Lists;
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Stack;
     
    -public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
    +public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
       private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
     
    -  private List<MaterializedField> outputFields;
    +  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
       private UnionAller unionall;
    -  private UnionAllInput unionAllInput;
    -  private RecordBatch current;
    -
       private final List<TransferPair> transfers = Lists.newArrayList();
    -  private List<ValueVector> allocationVectors;
    -  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
    +  private List<ValueVector> allocationVectors = Lists.newArrayList();
       private int recordCount = 0;
    -  private boolean schemaAvailable = false;
    +  private UnionInputIterator unionInputIterator;
     
       public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext
context) throws OutOfMemoryException {
    -    super(config, context, false);
    -    assert (children.size() == 2) : "The number of the operands of Union must be 2";
    -    unionAllInput = new UnionAllInput(this, children.get(0), children.get(1));
    -  }
    -
    -  @Override
    -  public int getRecordCount() {
    -    return recordCount;
    +    super(config, context, true, children.get(0), children.get(1));
       }
     
       @Override
       protected void killIncoming(boolean sendUpstream) {
    -    unionAllInput.getLeftRecordBatch().kill(sendUpstream);
    -    unionAllInput.getRightRecordBatch().kill(sendUpstream);
    +    left.kill(sendUpstream);
    +    right.kill(sendUpstream);
       }
     
    -  @Override
    -  public SelectionVector2 getSelectionVector2() {
    -    throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection
vector");
    -  }
    +  protected void buildSchema() throws SchemaChangeException {
    +    if (! prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
     
    -  @Override
    -  public SelectionVector4 getSelectionVector4() {
    -    throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection
vector");
    +    unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right);
    +
    +    if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA)
{
    +      inferOutputFieldsOneSide(right.getSchema());
    +    } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA)
{
    +      inferOutputFieldsOneSide((left.getSchema()));
    +    } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream ==
IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
    +    }
    +
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    for (VectorWrapper vv: container) {
    +      vv.getValueVector().allocateNew();
    +      vv.getValueVector().getMutator().setValueCount(0);
    +    }
       }
     
       @Override
       public IterOutcome innerNext() {
         try {
    -      IterOutcome upstream = unionAllInput.nextBatch();
    -      logger.debug("Upstream of Union-All: {}", upstream);
    +      if (!unionInputIterator.hasNext()) {
    +        return IterOutcome.NONE;
    +      }
    +
    +      Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
    +
    +      IterOutcome upstream = nextBatch.left;
    +      RecordBatch incoming = nextBatch.right;
    +
    +      // skip batches with same schema as the previous one yet having 0 row.
    +      if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
    +        do {
    +          for (final VectorWrapper<?> w : incoming) {
    +            w.clear();
    +          }
    --- End diff --
    
    `VectorAccessibleUtilities.clear(incoming)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message