drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinfengni <...@git.apache.org>
Subject [GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ...
Date Wed, 30 Aug 2017 18:37:43 GMT
Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/906#discussion_r136154993
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
---
    @@ -130,562 +145,248 @@ public IterOutcome innerNext() {
       }
     
       @Override
    -  public WritableBatch getWritableBatch() {
    -    return WritableBatch.get(this);
    +  public int getRecordCount() {
    +    return recordCount;
       }
     
    -  private void setValueCount(int count) {
    -    for (ValueVector v : allocationVectors) {
    -      ValueVector.Mutator m = v.getMutator();
    -      m.setValueCount(count);
    -    }
    -  }
     
    -  private boolean doAlloc() {
    -    for (ValueVector v : allocationVectors) {
    -      try {
    -        AllocationHelper.allocateNew(v, current.getRecordCount());
    -      } catch (OutOfMemoryException ex) {
    -        return false;
    -      }
    +  @SuppressWarnings("resource")
    +  private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException,
IOException, SchemaChangeException {
    +    if (inputBatch.getSchema().getFieldCount() != container.getSchema().getFieldCount())
{
    +      // wrong.
         }
    -    return true;
    -  }
     
    -  @SuppressWarnings("resource")
    -  private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException
{
    -    if (allocationVectors != null) {
    -      for (ValueVector v : allocationVectors) {
    -        v.clear();
    -      }
    +    if (newSchema) {
    +      createUnionAller(inputBatch);
         }
     
    -    allocationVectors = Lists.newArrayList();
    -    transfers.clear();
    +    container.zeroVectors();
     
    -    // If both sides of Union-All are empty
    -    if(unionAllInput.isBothSideEmpty()) {
    -      for(int i = 0; i < outputFields.size(); ++i) {
    -        final String colName = outputFields.get(i).getPath();
    -        final MajorType majorType = MajorType.newBuilder()
    -            .setMinorType(MinorType.INT)
    -            .setMode(DataMode.OPTIONAL)
    -            .build();
    -
    -        MaterializedField outputField = MaterializedField.create(colName, majorType);
    -        ValueVector vv = container.addOrGet(outputField, callBack);
    -        allocationVectors.add(vv);
    -      }
    +    for (final ValueVector v : this.allocationVectors) {
    +      AllocationHelper.allocateNew(v, inputBatch.getRecordCount());
    +    }
     
    -      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +    recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
    +    for (final ValueVector v : allocationVectors) {
    +      final ValueVector.Mutator m = v.getMutator();
    +      m.setValueCount(recordCount);
    +    }
    +
    +    if (callBack.getSchemaChangedAndReset()) {
           return IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      return IterOutcome.OK;
         }
    +  }
    +
    +  private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException,
IOException, SchemaChangeException {
    +    transfers.clear();
    +    allocationVectors.clear();;
     
         final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION,
context.getFunctionRegistry(), context.getOptions());
         cg.getCodeGenerator().plainJavaCapable(true);
         // Uncomment out this line to debug the generated code.
    -//    cg.getCodeGenerator().saveCodeForDebugging(true);
    +    //    cg.getCodeGenerator().saveCodeForDebugging(true);
    +
         int index = 0;
    -    for(VectorWrapper<?> vw : current) {
    -       ValueVector vvIn = vw.getValueVector();
    -      // get the original input column names
    -      SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath());
    -      // get the renamed column names
    -      SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getPath());
    +    for(VectorWrapper<?> vw : inputBatch) {
    +      ValueVector vvIn = vw.getValueVector();
    +      ValueVector vvOut = container.getValueVector(index).getValueVector();
     
           final ErrorCollector collector = new ErrorCollectorImpl();
           // According to input data names, Minortypes, Datamodes, choose to
           // transfer directly,
           // rename columns or
           // cast data types (Minortype or DataMode)
    -      if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField()))
{
    +      if (hasSameTypeAndMode(container.getSchema().getColumn(index), vvIn.getField())
    +          && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP
// Per DRILL-5521, existing bug for map transfer
    +          ) {
             // Transfer column
    -
    -        MajorType outputFieldType = outputFields.get(index).getType();
    -        MaterializedField outputField = MaterializedField.create(outputPath.getAsUnescapedPath(),
outputFieldType);
    -
    -        /*
    -          todo: Fix if condition when DRILL-4824 is merged
    -          If condition should be changed to:
    -          `if (outputFields.get(index).getPath().equals(inputPath.getAsUnescapedPath()))
{`
    -          DRILL-5419 has changed condition to correct one but this caused regression
(DRILL-5521).
    -          Root cause is missing indication of child column in map types when it is null.
    -          DRILL-4824 is re-working json reader implementation, including map types and
will fix this problem.
    -          Reverting condition to previous one to avoid regression till DRILL-4824 is
merged.
    -          Unit test - TestJsonReader.testKvgenWithUnionAll().
    -         */
    -        if (outputFields.get(index).getPath().equals(inputPath)) {
    -          ValueVector vvOut = container.addOrGet(outputField);
    -          TransferPair tp = vvIn.makeTransferPair(vvOut);
    -          transfers.add(tp);
    +        TransferPair tp = vvIn.makeTransferPair(vvOut);
    +        transfers.add(tp);
             // Copy data in order to rename the column
    -        } else {
    -          final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath,
current, collector, context.getFunctionRegistry() );
    -          if (collector.hasErrors()) {
    -            throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
    -          }
    -
    -          ValueVector vv = container.addOrGet(outputField, callBack);
    -          allocationVectors.add(vv);
    -          TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getPath()));
    -          ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr,
true);
    -          cg.addExpr(write);
    -        }
    -      // Cast is necessary
    +      } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL)
{
    +        continue;
           } else {
    -        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current,
collector, context.getFunctionRegistry());
    +        SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath());
    +        MaterializedField inField = vvIn.getField();
    +        MaterializedField outputField = vvOut.getField();
    +
    +        LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch,
collector, context.getFunctionRegistry());
    +
             if (collector.hasErrors()) {
               throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
             }
     
             // If the inputs' DataMode is required and the outputs' DataMode is not required
             // cast to the one with the least restriction
    -        if(vvIn.getField().getType().getMode() == DataMode.REQUIRED
    -            && outputFields.get(index).getType().getMode() != DataMode.REQUIRED)
{
    -          expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(),
context.getFunctionRegistry(), collector);
    +        if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
    +            && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED)
{
    +          expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(),
context.getFunctionRegistry(), collector);
               if (collector.hasErrors()) {
                 throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
               }
             }
     
             // If two inputs' MinorTypes are different,
             // Insert a cast before the Union operation
    -        if(vvIn.getField().getType().getMinorType() != outputFields.get(index).getType().getMinorType())
{
    -          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputFields.get(index).getType(),
context.getFunctionRegistry(), collector);
    +        if(inField.getType().getMinorType() != outputField.getType().getMinorType())
{
    +          expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(),
context.getFunctionRegistry(), collector);
               if (collector.hasErrors()) {
                 throw new SchemaChangeException(String.format("Failure while trying to materialize
incoming schema.  Errors:\n %s.", collector.toErrorString()));
               }
             }
     
    -        final MaterializedField outputField = MaterializedField.create(outputPath.getAsUnescapedPath(),
expr.getMajorType());
    -        ValueVector vector = container.addOrGet(outputField, callBack);
    -        allocationVectors.add(vector);
             TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getPath()));
     
    -        boolean useSetSafe = !(vector instanceof FixedWidthVector);
    +        boolean useSetSafe = !(vvOut instanceof FixedWidthVector);
             ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr,
useSetSafe);
             cg.addExpr(write);
    +
    +        allocationVectors.add(vvOut);
           }
           ++index;
         }
     
         unionall = context.getImplementationClass(cg.getCodeGenerator());
    -    unionall.setup(context, current, this, transfers);
    -
    -    if(!schemaAvailable) {
    -      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    -      schemaAvailable = true;
    -    }
    -
    -    if(!doAlloc()) {
    -      return IterOutcome.OUT_OF_MEMORY;
    -    }
    -
    -    recordCount = unionall.unionRecords(0, current.getRecordCount(), 0);
    -    setValueCount(recordCount);
    -    return IterOutcome.OK;
    -  }
    -
    -  public static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField
rightField) {
    -    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
    -        && (leftField.getType().getMode() == rightField.getType().getMode());
    -  }
    -
    -  // This method is used by inner class to point the reference `current` to the correct
record batch
    -  private void setCurrentRecordBatch(RecordBatch target) {
    -    this.current = target;
    -  }
    -
    -  // This method is used by inner class to clear the current record batch
    -  private void clearCurrentRecordBatch() {
    -    for(VectorWrapper<?> v: current) {
    -      v.clear();
    -    }
    +    unionall.setup(context, inputBatch, this, transfers);
       }
     
    -  public static class UnionAllInput {
    -    private UnionAllRecordBatch unionAllRecordBatch;
    -    private List<MaterializedField> outputFields;
    -    private OneSideInput leftSide;
    -    private OneSideInput rightSide;
    -    private IterOutcome upstream = IterOutcome.NOT_YET;
    -    private boolean leftIsFinish = false;
    -    private boolean rightIsFinish = false;
    -
    -    // These two schemas are obtained from the first record batches of the left and right
inputs
    -    // They are used to check if the schema is changed between recordbatches
    -    private BatchSchema leftSchema;
    -    private BatchSchema rightSchema;
    -    private boolean bothEmpty = false;
    -
    -    public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch
right) {
    -      this.unionAllRecordBatch = unionAllRecordBatch;
    -      leftSide = new OneSideInput(left);
    -      rightSide = new OneSideInput(right);
    -    }
    -
    -    private void setBothSideEmpty(boolean bothEmpty) {
    -      this.bothEmpty = bothEmpty;
    -    }
    -
    -    private boolean isBothSideEmpty() {
    -      return bothEmpty;
    -    }
    -
    -    public IterOutcome nextBatch() throws SchemaChangeException {
    -      if(upstream == RecordBatch.IterOutcome.NOT_YET) {
    -        IterOutcome iterLeft = leftSide.nextBatch();
    -        switch(iterLeft) {
    -          case OK_NEW_SCHEMA:
    -            /*
    -             * If the first few record batches are all empty,
    -             * there is no way to tell whether these empty batches are coming from empty
files.
    -             * It is incorrect to infer output types when either side could be coming
from empty.
    -             *
    -             * Thus, while-loop is necessary to skip those empty batches.
    -             */
    -            whileLoop:
    -            while(leftSide.getRecordBatch().getRecordCount() == 0) {
    -              iterLeft = leftSide.nextBatch();
    -
    -              switch(iterLeft) {
    -                case STOP:
    -                case OUT_OF_MEMORY:
    -                  return iterLeft;
    -
    -                case NONE:
    -                  // Special Case: The left side was an empty input.
    -                  leftIsFinish = true;
    -                  break whileLoop;
    -
    -                case NOT_YET:
    -                case OK_NEW_SCHEMA:
    -                case OK:
    -                  continue whileLoop;
    -
    -                default:
    -                  throw new IllegalStateException(
    -                      String.format("Unexpected state %s.", iterLeft));
    -              }
    -            }
    -
    -            break;
    -          case STOP:
    -          case OUT_OF_MEMORY:
    -            return iterLeft;
    -
    -          default:
    -            throw new IllegalStateException(
    -                String.format("Unexpected state %s.", iterLeft));
    -        }
    -
    -        IterOutcome iterRight = rightSide.nextBatch();
    -        switch(iterRight) {
    -          case OK_NEW_SCHEMA:
    -            // Unless there is no record batch on the left side of the inputs,
    -            // always start processing from the left side.
    -            if(leftIsFinish) {
    -              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
    -            } else {
    -              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
    -            }
    -            // If the record count of the first batch from right input is zero,
    -            // there are two possibilities:
    -            // 1. The right side is an empty input (e.g., file).
    -            // 2. There will be more records carried by later batches.
    -
    -            /*
    -             * If the first few record batches are all empty,
    -             * there is no way to tell whether these empty batches are coming from empty
files.
    -             * It is incorrect to infer output types when either side could be coming
from empty.
    -             *
    -             * Thus, while-loop is necessary to skip those empty batches.
    -             */
    -            whileLoop:
    -            while(rightSide.getRecordBatch().getRecordCount() == 0) {
    -              iterRight = rightSide.nextBatch();
    -              switch(iterRight) {
    -                case STOP:
    -                case OUT_OF_MEMORY:
    -                  return iterRight;
    -
    -                case NONE:
    -                  // Special Case: The right side was an empty input.
    -                  rightIsFinish = true;
    -                  break whileLoop;
    -
    -                case NOT_YET:
    -                case OK_NEW_SCHEMA:
    -                case OK:
    -                  continue whileLoop;
    -
    -                default:
    -                  throw new IllegalStateException(
    -                      String.format("Unexpected state %s.", iterRight));
    -              }
    -            }
    -
    -            if(leftIsFinish && rightIsFinish) {
    -              setBothSideEmpty(true);
    -            }
    -
    -            inferOutputFields();
    -            break;
    -
    -          case STOP:
    -          case OUT_OF_MEMORY:
    -            return iterRight;
    -
    -          default:
    -            throw new IllegalStateException(
    -                String.format("Unexpected state %s.", iterRight));
    -        }
    -
    -
    -
    -        upstream = IterOutcome.OK_NEW_SCHEMA;
    -        return upstream;
    -      } else {
    -        if(isBothSideEmpty()) {
    -          return IterOutcome.NONE;
    -        }
    -
    -        unionAllRecordBatch.clearCurrentRecordBatch();
    -
    -        if(leftIsFinish && rightIsFinish) {
    -          upstream = IterOutcome.NONE;
    -          return upstream;
    -        } else if(leftIsFinish) {
    -          IterOutcome iterOutcome = rightSide.nextBatch();
    -
    -          switch(iterOutcome) {
    -            case NONE:
    -              rightIsFinish = true;
    -              // fall through
    -            case STOP:
    -            case OUT_OF_MEMORY:
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case OK_NEW_SCHEMA:
    -              if(!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
    -                throw new SchemaChangeException("Schema change detected in the right
input of Union-All. This is not currently supported");
    -              }
    -              iterOutcome = IterOutcome.OK;
    -              // fall through
    -            case OK:
    -              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            default:
    -              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
    -          }
    -        } else if(rightIsFinish) {
    -          IterOutcome iterOutcome = leftSide.nextBatch();
    -          switch(iterOutcome) {
    -            case STOP:
    -            case OUT_OF_MEMORY:
    -            case NONE:
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case OK:
    -              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            default:
    -              throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
    -          }
    -        } else {
    -          IterOutcome iterOutcome = leftSide.nextBatch();
    -
    -          switch(iterOutcome) {
    -            case STOP:
    -            case OUT_OF_MEMORY:
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case OK_NEW_SCHEMA:
    -              if(!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
    -                throw new SchemaChangeException("Schema change detected in the left input
of Union-All. This is not currently supported");
    -              }
    -
    -              iterOutcome = IterOutcome.OK;
    -              // fall through
    -            case OK:
    -              unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
    -              upstream = iterOutcome;
    -              return upstream;
    -
    -            case NONE:
    -              unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
    -              upstream = IterOutcome.OK;
    -              leftIsFinish = true;
    -              return upstream;
    -
    -            default:
    -              throw new IllegalStateException(String.format("Unknown state %s.", upstream));
    -          }
    -        }
    -      }
    -    }
     
    -    /**
    -     *
    -     * Summarize the inference in the four different situations:
    -     * First of all, the field names are always determined by the left side
    -     * (Even when the left side is from an empty file, we have the column names.)
    -     *
    -     * Cases:
    -     * 1. Left: non-empty; Right: non-empty
    -     *      types determined by both sides with implicit casting involved
    -     * 2. Left: empty; Right: non-empty
    -     *      type from the right
    -     * 3. Left: non-empty; Right: empty
    -     *      types from the left
    -     * 4. Left: empty; Right: empty
    -     *      types are nullable integer
    -     */
    -    private void inferOutputFields() {
    -      if(!leftIsFinish && !rightIsFinish) {
    -        // Both sides are non-empty
    -        inferOutputFieldsBothSide();
    -      } else if(!rightIsFinish) {
    -        // Left side is non-empty
    -        // While use left side's column names as output column names,
    -        // use right side's column types as output column types.
    -        inferOutputFieldsFromSingleSide(
    -            leftSide.getRecordBatch().getSchema(),
    -            rightSide.getRecordBatch().getSchema());
    +  // The output table's column names always follow the left table,
    +  // where the output type is chosen based on DRILL's implicit casting rules
    +  private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema
rightSchema) {
    +//    outputFields = Lists.newArrayList();
    +    final Iterator<MaterializedField> leftIter = leftSchema.iterator();
    +    final Iterator<MaterializedField> rightIter = rightSchema.iterator();
    +
    +    int index = 1;
    +    while (leftIter.hasNext() && rightIter.hasNext()) {
    +      MaterializedField leftField  = leftIter.next();
    +      MaterializedField rightField = rightIter.next();
    +
    +      if (hasSameTypeAndMode(leftField, rightField)) {
    +        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
    +        builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(),
builder);
    +        container.addOrGet(MaterializedField.create(leftField.getPath(), builder.build()),
callBack);
    +      } else if (Types.isUntypedNull(rightField.getType())) {
    +        container.addOrGet(leftField, callBack);
    +      } else if (Types.isUntypedNull(leftField.getType())) {
    +        container.addOrGet(MaterializedField.create(leftField.getPath(), rightField.getType()),
callBack);
           } else {
    -        // Either right side is empty or both are empty
    -        // Using left side's schema is sufficient
    -        inferOutputFieldsFromSingleSide(
    -            leftSide.getRecordBatch().getSchema(),
    -            leftSide.getRecordBatch().getSchema());
    -      }
    -    }
    -
    -    // The output table's column names always follow the left table,
    -    // where the output type is chosen based on DRILL's implicit casting rules
    -    private void inferOutputFieldsBothSide() {
    -      outputFields = Lists.newArrayList();
    -      leftSchema = leftSide.getRecordBatch().getSchema();
    -      rightSchema = rightSide.getRecordBatch().getSchema();
    -      Iterator<MaterializedField> leftIter = leftSchema.iterator();
    -      Iterator<MaterializedField> rightIter = rightSchema.iterator();
    -
    -      int index = 1;
    -      while (leftIter.hasNext() && rightIter.hasNext()) {
    -        MaterializedField leftField  = leftIter.next();
    -        MaterializedField rightField = rightIter.next();
    -
    -        if (hasSameTypeAndMode(leftField, rightField)) {
    -          MajorType.Builder builder = MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
    +        // If the output type is not the same,
    +        // cast the column of one of the table to a data type which is the Least Restrictive
    +        TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
    +        if (leftField.getType().getMinorType() == rightField.getType().getMinorType())
{
    +          builder.setMinorType(leftField.getType().getMinorType());
               builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(),
builder);
    -          outputFields.add(MaterializedField.create(leftField.getPath(), builder.build()));
             } else {
    -          // If the output type is not the same,
    -          // cast the column of one of the table to a data type which is the Least Restrictive
    -          MajorType.Builder builder = MajorType.newBuilder();
    -          if (leftField.getType().getMinorType() == rightField.getType().getMinorType())
{
    -            builder.setMinorType(leftField.getType().getMinorType());
    -            builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(),
builder);
    -          } else {
    -            List<MinorType> types = Lists.newLinkedList();
    -            types.add(leftField.getType().getMinorType());
    -            types.add(rightField.getType().getMinorType());
    -            MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
    -            if (outputMinorType == null) {
    -              throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString()
+
    -                  " on the left side and " + rightField.getType().getMinorType().toString()
+
    -                  " on the right side in column " + index + " of UNION ALL");
    -            }
    -            builder.setMinorType(outputMinorType);
    +          List<TypeProtos.MinorType> types = Lists.newLinkedList();
    +          types.add(leftField.getType().getMinorType());
    +          types.add(rightField.getType().getMinorType());
    +          TypeProtos.MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
    +          if (outputMinorType == null) {
    +            throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString()
+
    +                " on the left side and " + rightField.getType().getMinorType().toString()
+
    +                " on the right side in column " + index + " of UNION ALL");
               }
    -
    -          // The output data mode should be as flexible as the more flexible one from
the two input tables
    -          List<DataMode> dataModes = Lists.newLinkedList();
    -          dataModes.add(leftField.getType().getMode());
    -          dataModes.add(rightField.getType().getMode());
    -          builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
    -
    -          outputFields.add(MaterializedField.create(leftField.getPath(), builder.build()));
    +          builder.setMinorType(outputMinorType);
             }
    -        ++index;
    -      }
    -
    -      assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column
count should have been detected when validating sqlNode at planning";
    -    }
     
    -    private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final
BatchSchema schemaForTypes) {
    -      outputFields = Lists.newArrayList();
    +        // The output data mode should be as flexible as the more flexible one from the
two input tables
    +        List<TypeProtos.DataMode> dataModes = Lists.newLinkedList();
    +        dataModes.add(leftField.getType().getMode());
    +        dataModes.add(rightField.getType().getMode());
    +        builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
     
    -      final List<String> outputColumnNames = Lists.newArrayList();
    -      final Iterator<MaterializedField> iterForNames = schemaForNames.iterator();
    -      while(iterForNames.hasNext()) {
    -        outputColumnNames.add(iterForNames.next().getPath());
    -      }
    -
    -      final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator();
    -      for(int i = 0; iterForTypes.hasNext(); ++i) {
    -        MaterializedField field = iterForTypes.next();
    -        outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType()));
    +        container.addOrGet(MaterializedField.create(leftField.getPath(), builder.build()),
callBack);
           }
    +      ++index;
         }
     
    -    public List<MaterializedField> getOutputFields() {
    -      if(outputFields == null) {
    -        throw new NullPointerException("Output fields have not been inferred");
    -      }
    -
    -      return outputFields;
    -    }
    +    assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column
count should have been detected when validating sqlNode at planning";
    --- End diff --
    
    This is also in the original code. (Seems git diff did not do a good job). I think it
might need cleanup, but it's probably out of scope for this JIRA. 
    
    1. https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java#L620


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message