Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FBBE200CE1 for ; Thu, 31 Aug 2017 22:53:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1E1C716BB00; Thu, 31 Aug 2017 20:53:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3E30C16BB01 for ; Thu, 31 Aug 2017 22:53:18 +0200 (CEST) Received: (qmail 31602 invoked by uid 500); 31 Aug 2017 20:53:16 -0000 Mailing-List: contact dev-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list dev@drill.apache.org Received: (qmail 31460 invoked by uid 99); 31 Aug 2017 20:53:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Aug 2017 20:53:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C38DDF55AA; Thu, 31 Aug 2017 20:53:15 +0000 (UTC) From: paul-rogers To: dev@drill.apache.org Reply-To: dev@drill.apache.org References: In-Reply-To: Subject: [GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ... Content-Type: text/plain Message-Id: <20170831205315.C38DDF55AA@git1-us-west.apache.org> Date: Thu, 31 Aug 2017 20:53:15 +0000 (UTC) archived-at: Thu, 31 Aug 2017 20:53:19 -0000 Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r136440163 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java --- @@ -130,562 +145,248 @@ public IterOutcome innerNext() { } @Override - public WritableBatch getWritableBatch() { - return WritableBatch.get(this); + public int getRecordCount() { + return recordCount; } - private void setValueCount(int count) { - for (ValueVector v : allocationVectors) { - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(count); - } - } - private boolean doAlloc() { - for (ValueVector v : allocationVectors) { - try { - AllocationHelper.allocateNew(v, current.getRecordCount()); - } catch (OutOfMemoryException ex) { - return false; - } + @SuppressWarnings("resource") + private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException { + if (inputBatch.getSchema().getFieldCount() != container.getSchema().getFieldCount()) { + // wrong. } - return true; - } - @SuppressWarnings("resource") - private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException { - if (allocationVectors != null) { - for (ValueVector v : allocationVectors) { - v.clear(); - } + if (newSchema) { + createUnionAller(inputBatch); } - allocationVectors = Lists.newArrayList(); - transfers.clear(); + container.zeroVectors(); - // If both sides of Union-All are empty - if(unionAllInput.isBothSideEmpty()) { - for(int i = 0; i < outputFields.size(); ++i) { - final String colName = outputFields.get(i).getPath(); - final MajorType majorType = MajorType.newBuilder() - .setMinorType(MinorType.INT) - .setMode(DataMode.OPTIONAL) - .build(); - - MaterializedField outputField = MaterializedField.create(colName, majorType); - ValueVector vv = container.addOrGet(outputField, callBack); - allocationVectors.add(vv); - } + for (final ValueVector v : this.allocationVectors) { + AllocationHelper.allocateNew(v, inputBatch.getRecordCount()); + } - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0); + for (final ValueVector v : allocationVectors) { + final ValueVector.Mutator m = v.getMutator(); + m.setValueCount(recordCount); + } + + if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; + } else { + return IterOutcome.OK; } + } + + private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException { + transfers.clear(); + allocationVectors.clear();; final ClassGenerator cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); + int index = 0; - for(VectorWrapper vw : current) { - ValueVector vvIn = vw.getValueVector(); - // get the original input column names - SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath()); - // get the renamed column names - SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getPath()); + for(VectorWrapper vw : inputBatch) { + ValueVector vvIn = vw.getValueVector(); + ValueVector vvOut = container.getValueVector(index).getValueVector(); final ErrorCollector collector = new ErrorCollectorImpl(); // According to input data names, Minortypes, Datamodes, choose to // transfer directly, // rename columns or // cast data types (Minortype or DataMode) - if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) { + if (hasSameTypeAndMode(container.getSchema().getColumn(index), vvIn.getField()) + && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer + ) { // Transfer column - - MajorType outputFieldType = outputFields.get(index).getType(); - MaterializedField outputField = MaterializedField.create(outputPath.getAsUnescapedPath(), outputFieldType); - - /* - todo: Fix if condition when DRILL-4824 is merged - If condition should be changed to: - `if (outputFields.get(index).getPath().equals(inputPath.getAsUnescapedPath())) {` - DRILL-5419 has changed condition to correct one but this caused regression (DRILL-5521). - Root cause is missing indication of child column in map types when it is null. - DRILL-4824 is re-working json reader implementation, including map types and will fix this problem. - Reverting condition to previous one to avoid regression till DRILL-4824 is merged. - Unit test - TestJsonReader.testKvgenWithUnionAll(). - */ - if (outputFields.get(index).getPath().equals(inputPath)) { - ValueVector vvOut = container.addOrGet(outputField); - TransferPair tp = vvIn.makeTransferPair(vvOut); - transfers.add(tp); + TransferPair tp = vvIn.makeTransferPair(vvOut); + transfers.add(tp); // Copy data in order to rename the column - } else { - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry() ); - if (collector.hasErrors()) { - throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } - - ValueVector vv = container.addOrGet(outputField, callBack); - allocationVectors.add(vv); - TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getPath())); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); - cg.addExpr(write); - } - // Cast is necessary + } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL) { + continue; } else { - LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry()); + SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath()); + MaterializedField inField = vvIn.getField(); + MaterializedField outputField = vvOut.getField(); + + LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry()); + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } // If the inputs' DataMode is required and the outputs' DataMode is not required // cast to the one with the least restriction - if(vvIn.getField().getType().getMode() == DataMode.REQUIRED - && outputFields.get(index).getType().getMode() != DataMode.REQUIRED) { - expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(), context.getFunctionRegistry(), collector); + if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED --- End diff -- Makes sense. Thanks for the explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---