Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 80623200498 for ; Tue, 29 Aug 2017 09:03:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7EC3116626D; Tue, 29 Aug 2017 07:03:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C512016626A for ; Tue, 29 Aug 2017 09:03:44 +0200 (CEST) Received: (qmail 9687 invoked by uid 500); 29 Aug 2017 07:03:43 -0000 Mailing-List: contact dev-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list dev@drill.apache.org Received: (qmail 9666 invoked by uid 99); 29 Aug 2017 07:03:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Aug 2017 07:03:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3DE20F32EB; Tue, 29 Aug 2017 07:03:43 +0000 (UTC) From: jinfengni To: dev@drill.apache.org Reply-To: dev@drill.apache.org References: In-Reply-To: Subject: [GitHub] drill pull request #906: DRILL-5546: Handle schema change exception failure ... Content-Type: text/plain Message-Id: <20170829070343.3DE20F32EB@git1-us-west.apache.org> Date: Tue, 29 Aug 2017 07:03:43 +0000 (UTC) archived-at: Tue, 29 Aug 2017 07:03:45 -0000 Github user jinfengni commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r135708467 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java --- @@ -152,97 +157,75 @@ public void kill(boolean sendUpstream) { } } - private void releaseAssets() { - container.zeroVectors(); - } - - private void clearFieldVectorMap() { - for (final ValueVector v : mutator.fieldVectorMap().values()) { - v.clear(); - } - } - @Override public IterOutcome next() { if (done) { return IterOutcome.NONE; } oContext.getStats().startProcessing(); try { - try { - injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class); - - currentReader.allocate(mutator.fieldVectorMap()); - } catch (OutOfMemoryException e) { - clearFieldVectorMap(); - throw UserException.memoryError(e).build(logger); - } - while ((recordCount = currentReader.next()) == 0) { + while (true) { try { - if (!readers.hasNext()) { - // We're on the last reader, and it has no (more) rows. - currentReader.close(); - releaseAssets(); - done = true; // have any future call to next() return NONE - - if (mutator.isNewSchema()) { - // This last reader has a new schema (e.g., we have a zero-row - // file or other source). (Note that some sources have a non- - // null/non-trivial schema even when there are no rows.) + injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class); + currentReader.allocate(mutator.fieldVectorMap()); + } catch (OutOfMemoryException e) { + clearFieldVectorMap(); + throw UserException.memoryError(e).build(logger); + } - container.buildSchema(SelectionVectorMode.NONE); - schema = container.getSchema(); + recordCount = currentReader.next(); + Preconditions.checkArgument(recordCount >= 0, + "recordCount from RecordReader.next() should not be negative"); - return IterOutcome.OK_NEW_SCHEMA; - } - return IterOutcome.NONE; - } - // At this point, the reader that hit its end is not the last reader. + boolean isNewRegularSchema = mutator.isNewSchema(); + // We should skip the reader, when recordCount = 0 && ! isNewRegularSchema. + // Add/set implicit column vectors, only when reader gets > 0 row, or + // when reader gets 0 row but with a schema with new field added + if (recordCount > 0 || isNewRegularSchema) { + addImplicitVectors(); + populateImplicitVectors(); + } - // If all the files we have read so far are just empty, the schema is not useful - if (! hasReadNonEmptyFile) { - container.clear(); - clearFieldVectorMap(); - mutator.clear(); - } + boolean isNewImplicitSchema = mutator.isNewSchema(); + for (VectorWrapper w : container) { + w.getValueVector().getMutator().setValueCount(recordCount); + } + final boolean isNewSchema = isNewRegularSchema || isNewImplicitSchema; + oContext.getStats().batchReceived(0, recordCount, isNewSchema); + if (recordCount == 0) { currentReader.close(); - currentReader = readers.next(); - implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; - currentReader.setup(oContext, mutator); - try { - currentReader.allocate(mutator.fieldVectorMap()); - } catch (OutOfMemoryException e) { - clearFieldVectorMap(); - throw UserException.memoryError(e).build(logger); + if (isNewSchema) { + // current reader presents a new schema in mutator even though it has 0 row. + // This could happen when data sources have a non-trivial schema with 0 row. + container.buildSchema(SelectionVectorMode.NONE); + schema = container.getSchema(); + if (readers.hasNext()) { + advanceNextReader(); + } else { + done = true; // indicates the follow-up next() call will return IterOutcome.NONE. + } + return IterOutcome.OK_NEW_SCHEMA; + } else { // not a new schema + if (readers.hasNext()) { + advanceNextReader(); + continue; // skip reader returning 0 row and having same schema. + // Skip to next loop iteration with next available reader. + } else { + releaseAssets(); // All data has been read. Release resource. + return IterOutcome.NONE; + } + } + } else { // recordCount > 0 + if (isNewSchema) { --- End diff -- Removed one copy of shared code to handle two different cases in revised patch, though I did not use the idea of state. Hopefully, the new code looks cleaner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---