drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5546) Schema change problems caused by empty batch
Date Wed, 30 Aug 2017 18:57:01 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147839#comment-16147839
] 

ASF GitHub Bot commented on DRILL-5546:
---------------------------------------

Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/906#discussion_r136159563
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
---
    @@ -130,562 +145,248 @@ public IterOutcome innerNext() {
       }
     
       @Override
    -  public WritableBatch getWritableBatch() {
    -    return WritableBatch.get(this);
    +  public int getRecordCount() {
    +    return recordCount;
       }
     
    -  private void setValueCount(int count) {
    -    for (ValueVector v : allocationVectors) {
    -      ValueVector.Mutator m = v.getMutator();
    -      m.setValueCount(count);
    -    }
    -  }
     
    -  private boolean doAlloc() {
    -    for (ValueVector v : allocationVectors) {
    -      try {
    -        AllocationHelper.allocateNew(v, current.getRecordCount());
    -      } catch (OutOfMemoryException ex) {
    -        return false;
    -      }
    +  @SuppressWarnings("resource")
    +  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException,
IOException, SchemaChangeException {
    +    if (inputBatch.getSchema().getFieldCount() != container.getSchema().getFieldCount())
{
    +      // wrong.
         }
    -    return true;
    -  }
     
    -  @SuppressWarnings("resource")
    -  private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException
{
    -    if (allocationVectors != null) {
    -      for (ValueVector v : allocationVectors) {
    -        v.clear();
    -      }
    +    if (newSchema) {
    +      createUnionAller(inputBatch);
         }
     
    -    allocationVectors = Lists.newArrayList();
    -    transfers.clear();
    +    container.zeroVectors();
     
    -    // If both sides of Union-All are empty
    -    if(unionAllInput.isBothSideEmpty()) {
    -      for(int i = 0; i < outputFields.size(); ++i) {
    -        final String colName = outputFields.get(i).getPath();
    -        final MajorType majorType = MajorType.newBuilder()
    -            .setMinorType(MinorType.INT)
    -            .setMode(DataMode.OPTIONAL)
    -            .build();
    -
    -        MaterializedField outputField = MaterializedField.create(colName, majorType);
    -        ValueVector vv = container.addOrGet(outputField, callBack);
    -        allocationVectors.add(vv);
    -      }
    +    for (final ValueVector v : this.allocationVectors) {
    +      AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
    +    }
     
    -      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +    recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
    +    for (final ValueVector v : allocationVectors) {
    +      final ValueVector.Mutator m = v.getMutator();
    +      m.setValueCount(recordCount);
    +    }
    +
    +    if (callBack.getSchemaChangedAndReset()) {
           return IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      return IterOutcome.OK;
         }
    +  }
    +
    +  private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException,
IOException, SchemaChangeException {
    +    transfers.clear();
    +    allocationVectors.clear();;
     
         final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION,
context.getFunctionRegistry(), context.getOptions());
         cg.getCodeGenerator().plainJavaCapable(true);
         // Uncomment out this line to debug the generated code.
    -//    cg.getCodeGenerator().saveCodeForDebugging(true);
    +    //    cg.getCodeGenerator().saveCodeForDebugging(true);
    +
         int index = 0;
    -    for(VectorWrapper<?> vw : current) {
    -       ValueVector vvIn = vw.getValueVector();
    -      // get the original input column names
    -      SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath());
    -      // get the renamed column names
    -      SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getPath());
    +    for(VectorWrapper<?> vw : inputBatch) {
    +      ValueVector vvIn = vw.getValueVector();
    +      ValueVector vvOut = container.getValueVector(index).getValueVector();
     
           final ErrorCollector collector = new ErrorCollectorImpl();
           // According to input data names, Minortypes, Datamodes, choose to
           // transfer directly,
           // rename columns or
           // cast data types (Minortype or DataMode)
    -      if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField()))
{
    +      if (hasSameTypeAndMode(container.getSchema().getColumn(index), vvIn.getField())
    +          && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP
// Per DRILL-5521, existing bug for map transfer
    +          ) {
             // Transfer column
    -
    -        MajorType outputFieldType = outputFields.get(index).getType();
    -        MaterializedField outputField = MaterializedField.create(outputPath.getAsUnescapedPath(),
outputFieldType);
    -
    -        /*
    -          todo: Fix if condition when DRILL-4824 is merged
    -          If condition should be changed to:
    -          `if (outputFields.get(index).getPath().equals(inputPath.getAsUnescapedPath()))
{`
    -          DRILL-5419 has changed condition to correct one but this caused regression
(DRILL-5521).
    -          Root cause is missing indication of child column in map types when it is null.
    -          DRILL-4824 is re-working json reader implementation, including map types and
will fix this problem.
    -          Reverting condition to previous one to avoid regression till DRILL-4824 is
merged.
    -          Unit test - TestJsonReader.testKvgenWithUnionAll().
    -         */
    -        if (outputFields.get(index).getPath().equals(inputPath)) {
    -          ValueVector vvOut = container.addOrGet(outputField);
    -          TransferPair tp = vvIn.makeTransferPair(vvOut);
    -          transfers.add(tp);
    +        TransferPair tp = vvIn.makeTransferPair(vvOut);
    +        transfers.add(tp);
             // Copy data in order to rename the column
    -        } else {
    -          final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath,
current, collector, context.getFunctionRegistry() );
    -          if (collector.hasErrors()) {
    -            throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
    -          }
    -
    -          ValueVector vv = container.addOrGet(outputField, callBack);
    -          allocationVectors.add(vv);
    -          TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getPath()));
    -          ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr,
true);
    -          cg.addExpr(write);
    -        }
    -      // Cast is necessary
    +      } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL)
{
    +        continue;
           } else {
    -        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current,
collector, context.getFunctionRegistry());
    +        SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath());
    +        MaterializedField inField = vvIn.getField();
    +        MaterializedField outputField = vvOut.getField();
    +
    +        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch,
collector, context.getFunctionRegistry());
    +
             if (collector.hasErrors()) {
               throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
             }
     
             // If the inputs' DataMode is required and the outputs' DataMode is not required
             // cast to the one with the least restriction
    -        if(vvIn.getField().getType().getMode() == DataMode.REQUIRED
    -            && outputFields.get(index).getType().getMode() != DataMode.REQUIRED)
{
    -          expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(),
context.getFunctionRegistry(), collector);
    +        if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
    +            && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED)
{
    +          expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(),
context.getFunctionRegistry(), collector);
               if (collector.hasErrors()) {
                 throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
               }
             }
     
             // If two inputs' MinorTypes are different,
             // Insert a cast before the Union operation
    -        if(vvIn.getField().getType().getMinorType() != outputFields.get(index).getType().getMinorType())
{
    -          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputFields.get(index).getType(),
context.getFunctionRegistry(), collector);
    +        if(inField.getType().getMinorType() != outputField.getType().getMinorType())
{
    +          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(),
context.getFunctionRegistry(), collector);
               if (collector.hasErrors()) {
                 throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
               }
             }
     
    -        final MaterializedField outputField = MaterializedField.create(outputPath.getAsUnescapedPath(),
expr.getMajorType());
    -        ValueVector vector = container.addOrGet(outputField, callBack);
    -        allocationVectors.add(vector);
             TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getPath()));
     
    -        boolean useSetSafe = !(vector instanceof FixedWidthVector);
    +        boolean useSetSafe = !(vvOut instanceof FixedWidthVector);
             ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr,
useSetSafe);
             cg.addExpr(write);
    +
    +        allocationVectors.add(vvOut);
           }
           ++index;
         }
     
         unionall = context.getImplementationClass(cg.getCodeGenerator());
    -    unionall.setup(context, current, this, transfers);
    -
    -    if(!schemaAvailable) {
    -      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    -      schemaAvailable = true;
    -    }
    -
    -    if(!doAlloc()) {
    -      return IterOutcome.OUT_OF_MEMORY;
    -    }
    -
    -    recordCount = unionall.unionRecords(0, current.getRecordCount(), 0);
    -    setValueCount(recordCount);
    -    return IterOutcome.OK;
    -  }
    -
    -  public static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField
rightField) {
    -    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
    -        && (leftField.getType().getMode() == rightField.getType().getMode());
    -  }
    -
    -  // This method is used by inner class to point the reference `current` to the correct
record batch
    -  private void setCurrentRecordBatch(RecordBatch target) {
    -    this.current = target;
    -  }
    -
    -  // This method is used by inner class to clear the current record batch
    -  private void clearCurrentRecordBatch() {
    -    for(VectorWrapper<?> v: current) {
    -      v.clear();
    -    }
    +    unionall.setup(context, inputBatch, this, transfers);
       }
     
    -  public static class UnionAllInput {
    -    private UnionAllRecordBatch unionAllRecordBatch;
    -    private List<MaterializedField> outputFields;
    -    private OneSideInput leftSide;
    -    private OneSideInput rightSide;
    -    private IterOutcome upstream = IterOutcome.NOT_YET;
    -    private boolean leftIsFinish = false;
    -    private boolean rightIsFinish = false;
    -
    -    // These two schemas are obtained from the first record batches of the left and right
inputs
    -    // They are used to check if the schema is changed between recordbatches
    -    private BatchSchema leftSchema;
    -    private BatchSchema rightSchema;
    -    private boolean bothEmpty = false;
    -
    -    public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch
right) {
    -      this.unionAllRecordBatch = unionAllRecordBatch;
    -      leftSide = new OneSideInput(left);
    -      rightSide = new OneSideInput(right);
    -    }
    -
    -    private void setBothSideEmpty(boolean bothEmpty) {
    -      this.bothEmpty = bothEmpty;
    -    }
    -
    -    private boolean isBothSideEmpty() {
    -      return bothEmpty;
    -    }
    -
    -    public IterOutcome nextBatch() throws SchemaChangeException {
    -      if(upstream == RecordBatch.IterOutcome.NOT_YET) {
    -        IterOutcome iterLeft = leftSide.nextBatch();
    -        switch(iterLeft) {
    -          case OK_NEW_SCHEMA:
    -            /*
    -             * If the first few record batches are all empty,
    -             * there is no way to tell whether these empty batches are coming from empty
files.
    -             * It is incorrect to infer output types when either side could be coming
from empty.
    -             *
    -             * Thus, while-loop is necessary to skip those empty batches.
    -             */
    -            whileLoop:
    -            while(leftSide.getRecordBatch().getRecordCount() == 0) {
    -              iterLeft = leftSide.nextBatch();
    -
    -              switch(iterLeft) {
    -                case STOP:
    -                case OUT_OF_MEMORY:
    -                  return iterLeft;
    -
    -                case NONE:
    -                  // Special Case: The left side was an empty input.
    -                  leftIsFinish = true;
    -                  break whileLoop;
    -
    -                case NOT_YET:
    -                case OK_NEW_SCHEMA:
    -                case OK:
    -                  continue whileLoop;
    -
    -                default:
    -                  throw new IllegalStateException(
    -                      String.format("Unexpected state %s.", iterLeft));
    -              }
    -            }
    -
    -            break;
    -          case STOP:
    -          case OUT_OF_MEMORY:
    -            return iterLeft;
    -
    -          default:
    -            throw new IllegalStateException(
    -                String.format("Unexpected state %s.", iterLeft));
    -        }
    -
    -        IterOutcome iterRight = rightSide.nextBatch();
    -        switch(iterRight) {
    -          case OK_NEW_SCHEMA:
    -            // Unless there is no record batch on the left side of the inputs,
    -            // always start processing from the left side.
    -            if(leftIsFinish) {
    -              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
    -            } else {
    -              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
    -            }
    -            // If the record count of the first batch from right input is zero,
    -            // there are two possibilities:
    -            // 1. The right side is an empty input (e.g., file).
    -            // 2. There will be more records carried by later batches.
    -
    -            /*
    -             * If the first few record batches are all empty,
    -             * there is no way to tell whether these empty batches are coming from empty
files.
    -             * It is incorrect to infer output types when either side could be coming
from empty.
    -             *
    -             * Thus, while-loop is necessary to skip those empty batches.
    -             */
    -            whileLoop:
    -            while(rightSide.getRecordBatch().getRecordCount() == 0) {
    -              iterRight = rightSide.nextBatch();
    -              switch(iterRight) {
    -                case STOP:
    -                case OUT_OF_MEMORY:
    -                  return iterRight;
    -
    -                case NONE:
    -                  // Special Case: The right side was an empty input.
    -                  rightIsFinish = true;
    -                  break whileLoop;
    -
    -                case NOT_YET:
    -                case OK_NEW_SCHEMA:
    -                case OK:
    -                  continue whileLoop;
    -
    -                default:
    -                  throw new IllegalStateException(
    -                      String.format("Unexpected state %s.", iterRight));
    -              }
    -            }
    -
    -            if(leftIsFinish && rightIsFinish) {
    -              setBothSideEmpty(true);
    -            }
    -
    -            inferOutputFields();
    -            break;
    -
    -          case STOP:
    -          case OUT_OF_MEMORY:
    -            return iterRight;
    -
    -          default:
    -            throw new IllegalStateException(
    -                String.format("Unexpected state %s.", iterRight));
    -        }
    -
    -
    -
    -        upstream = IterOutcome.OK_NEW_SCHEMA;
    -        return upstream;
    -      } else {
    -        if(isBothSideEmpty()) {
    -          return IterOutcome.NONE;
    -        }
    -
    -        unionAllRecordBatch.clearCurrentRecordBatch();
    -
    -        if(leftIsFinish && rightIsFinish) {
    -          upstream = IterOutcome.NONE;
    -          return upstream;
    -        } else if(leftIsFinish) {
    -          IterOutcome iterOutcome = rightSide.nextBatch();
    -
    -          switch(iterOutcome) {
    -            case NONE:
    -              rightIsFinish = true;
    -              // fall through
    -            case STOP:
    -            case OUT_OF_MEMORY:
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case OK_NEW_SCHEMA:
    -              if(!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
    -                throw new SchemaChangeException("Schema change detected in the right
input of Union-All. This is not currently supported");
    -              }
    -              iterOutcome = IterOutcome.OK;
    -              // fall through
    -            case OK:
    -              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            default:
    -              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
    -          }
    -        } else if(rightIsFinish) {
    -          IterOutcome iterOutcome = leftSide.nextBatch();
    -          switch(iterOutcome) {
    -            case STOP:
    -            case OUT_OF_MEMORY:
    -            case NONE:
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case OK:
    -              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            default:
    -              throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
    -          }
    -        } else {
    -          IterOutcome iterOutcome = leftSide.nextBatch();
    -
    -          switch(iterOutcome) {
    -            case STOP:
    -            case OUT_OF_MEMORY:
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case OK_NEW_SCHEMA:
    -              if(!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
    -                throw new SchemaChangeException("Schema change detected in the left input
of Union-All. This is not currently supported");
    -              }
    -
    -              iterOutcome = IterOutcome.OK;
    -              // fall through
    -            case OK:
    -              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case NONE:
    -              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
    -              upstream = IterOutcome.OK;
    -              leftIsFinish = true;
    -              return upstream;
    -
    -            default:
    -              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
    -          }
    -        }
    -      }
    -    }
     
    -    /**
    -     *
    -     * Summarize the inference in the four different situations:
    -     * First of all, the field names are always determined by the left side
    -     * (Even when the left side is from an empty file, we have the column names.)
    -     *
    -     * Cases:
    -     * 1. Left: non-empty; Right: non-empty
    -     *      types determined by both sides with implicit casting involved
    -     * 2. Left: empty; Right: non-empty
    -     *      type from the right
    -     * 3. Left: non-empty; Right: empty
    -     *      types from the left
    -     * 4. Left: empty; Right: empty
    -     *      types are nullable integer
    -     */
    -    private void inferOutputFields() {
    -      if(!leftIsFinish && !rightIsFinish) {
    -        // Both sides are non-empty
    -        inferOutputFieldsBothSide();
    -      } else if(!rightIsFinish) {
    -        // Left side is non-empty
    -        // While use left side's column names as output column names,
    -        // use right side's column types as output column types.
    -        inferOutputFieldsFromSingleSide(
    -            leftSide.getRecordBatch().getSchema(),
    -            rightSide.getRecordBatch().getSchema());
    +  // The output table's column names always follow the left table,
    +  // where the output type is chosen based on DRILL's implicit casting rules
    +  private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema
rightSchema) {
    +//    outputFields = Lists.newArrayList();
    +    final Iterator<MaterializedField> leftIter = leftSchema.iterator();
    +    final Iterator<MaterializedField> rightIter = rightSchema.iterator();
    +
    +    int index = 1;
    +    while (leftIter.hasNext() && rightIter.hasNext()) {
    +      MaterializedField leftField  = leftIter.next();
    +      MaterializedField rightField = rightIter.next();
    +
    +      if (hasSameTypeAndMode(leftField, rightField)) {
    +        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
    +        builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(),
builder);
    +        container.addOrGet(MaterializedField.create(leftField.getPath(), builder.build()),
callBack);
    +      } else if (Types.isUntypedNull(rightField.getType())) {
    +        container.addOrGet(leftField, callBack);
    +      } else if (Types.isUntypedNull(leftField.getType())) {
    +        container.addOrGet(MaterializedField.create(leftField.getPath(), rightField.getType()),
callBack);
           } else {
    -        // Either right side is empty or both are empty
    -        // Using left side's schema is sufficient
    -        inferOutputFieldsFromSingleSide(
    -            leftSide.getRecordBatch().getSchema(),
    -            leftSide.getRecordBatch().getSchema());
    -      }
    -    }
    -
    -    // The output table's column names always follow the left table,
    -    // where the output type is chosen based on DRILL's implicit casting rules
    -    private void inferOutputFieldsBothSide() {
    -      outputFields = Lists.newArrayList();
    -      leftSchema = leftSide.getRecordBatch().getSchema();
    -      rightSchema = rightSide.getRecordBatch().getSchema();
    -      Iterator<MaterializedField> leftIter = leftSchema.iterator();
    -      Iterator<MaterializedField> rightIter = rightSchema.iterator();
    -
    -      int index = 1;
    -      while (leftIter.hasNext() && rightIter.hasNext()) {
    -        MaterializedField leftField  = leftIter.next();
    -        MaterializedField rightField = rightIter.next();
    -
    -        if (hasSameTypeAndMode(leftField, rightField)) {
    -          MajorType.Builder builder = MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
    +        // If the output type is not the same,
    +        // cast the column of one of the table to a data type which is the Least Restrictive
    +        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
    +        if (leftField.getType().getMinorType() == rightField.getType().getMinorType())
{
    +          builder.setMinorType(leftField.getType().getMinorType());
               builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(),
builder);
    -          outputFields.add(MaterializedField.create(leftField.getPath(), builder.build()));
             } else {
    -          // If the output type is not the same,
    -          // cast the column of one of the table to a data type which is the Least Restrictive
    -          MajorType.Builder builder = MajorType.newBuilder();
    -          if (leftField.getType().getMinorType() == rightField.getType().getMinorType())
{
    -            builder.setMinorType(leftField.getType().getMinorType());
    -            builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(),
builder);
    -          } else {
    -            List<MinorType> types = Lists.newLinkedList();
    -            types.add(leftField.getType().getMinorType());
    -            types.add(rightField.getType().getMinorType());
    -            MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
    -            if (outputMinorType == null) {
    -              throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString()
+
    -                  " on the left side and " + rightField.getType().getMinorType().toString()
+
    -                  " on the right side in column " + index + " of UNION ALL");
    -            }
    -            builder.setMinorType(outputMinorType);
    +          List<TypeProtos.MinorType> types = Lists.newLinkedList();
    +          types.add(leftField.getType().getMinorType());
    +          types.add(rightField.getType().getMinorType());
    +          TypeProtos.MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
    +          if (outputMinorType == null) {
    +            throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString()
+
    +                " on the left side and " + rightField.getType().getMinorType().toString()
+
    +                " on the right side in column " + index + " of UNION ALL");
               }
    -
    -          // The output data mode should be as flexible as the more flexible one from
the two input tables
    -          List<DataMode> dataModes = Lists.newLinkedList();
    -          dataModes.add(leftField.getType().getMode());
    -          dataModes.add(rightField.getType().getMode());
    -          builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
    -
    -          outputFields.add(MaterializedField.create(leftField.getPath(), builder.build()));
    +          builder.setMinorType(outputMinorType);
             }
    -        ++index;
    -      }
    -
    -      assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column
count should have been detected when validating sqlNode at planning";
    -    }
     
    -    private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final
BatchSchema schemaForTypes) {
    -      outputFields = Lists.newArrayList();
    +        // The output data mode should be as flexible as the more flexible one from the
two input tables
    +        List<TypeProtos.DataMode> dataModes = Lists.newLinkedList();
    +        dataModes.add(leftField.getType().getMode());
    +        dataModes.add(rightField.getType().getMode());
    +        builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
     
    -      final List<String> outputColumnNames = Lists.newArrayList();
    -      final Iterator<MaterializedField> iterForNames = schemaForNames.iterator();
    -      while(iterForNames.hasNext()) {
    -        outputColumnNames.add(iterForNames.next().getPath());
    -      }
    -
    -      final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator();
    -      for(int i = 0; iterForTypes.hasNext(); ++i) {
    -        MaterializedField field = iterForTypes.next();
    -        outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType()));
    +        container.addOrGet(MaterializedField.create(leftField.getPath(), builder.build()),
callBack);
           }
    +      ++index;
         }
     
    -    public List<MaterializedField> getOutputFields() {
    -      if(outputFields == null) {
    -        throw new NullPointerException("Output fields have not been inferred");
    -      }
    -
    -      return outputFields;
    -    }
    +    assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column
count should have been detected when validating sqlNode at planning";
    +  }
     
    -    public void killIncoming(boolean sendUpstream) {
    -      leftSide.getRecordBatch().kill(sendUpstream);
    -      rightSide.getRecordBatch().kill(sendUpstream);
    +  private void inferOutputFieldsOneSide(final BatchSchema schema) {
    +    for (MaterializedField field : schema) {
    +      container.addOrGet(field, callBack);
         }
    +  }
     
    -    public RecordBatch getLeftRecordBatch() {
    -      return leftSide.getRecordBatch();
    -    }
    +  private static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField
rightField) {
    --- End diff --
    
    Move the method to `MaterializedField`. 


> Schema change problems caused by empty batch
> --------------------------------------------
>
>                 Key: DRILL-5546
>                 URL: https://issues.apache.org/jira/browse/DRILL-5546
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Jinfeng Ni
>            Assignee: Jinfeng Ni
>
> There have been a few JIRAs opened related to schema change failure caused by empty batch.
This JIRA is opened as an umbrella for all those related JIRAS ( such as DRILL-4686, DRILL-4734,
DRILL4476, DRILL-4255, etc).
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message