Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 452DE10B2E for ; Tue, 2 Dec 2014 13:53:34 +0000 (UTC) Received: (qmail 58020 invoked by uid 500); 2 Dec 2014 13:53:34 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 57950 invoked by uid 500); 2 Dec 2014 13:53:34 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 57888 invoked by uid 99); 2 Dec 2014 13:53:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Dec 2014 13:53:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D2B219B8B29; Tue, 2 Dec 2014 13:53:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smp@apache.org To: commits@drill.apache.org Date: Tue, 02 Dec 2014 13:53:34 -0000 Message-Id: <3d64d147d2c145d1830a0a97982abf23@git.apache.org> In-Reply-To: <20a32b392a534545af87d471ebabc540@git.apache.org> References: <20a32b392a534545af87d471ebabc540@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] drill git commit: DRILL-1781: Fast Complex Schema DRILL-1781: Fast Complex Schema Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3581a327 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3581a327 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3581a327 Branch: refs/heads/master Commit: 3581a327e3fad7045615fe03f4da16f1a7f95757 Parents: a60e1db Author: Steven Phillips Authored: Wed Nov 19 18:08:18 2014 -0800 Committer: Steven Phillips Committed: Tue Dec 2 04:55:06 2014 -0800 ---------------------------------------------------------------------- .../drill/exec/physical/impl/RootExec.java | 5 -- .../drill/exec/physical/impl/ScanBatch.java | 42 +--------- .../drill/exec/physical/impl/ScreenCreator.java | 27 +------ .../exec/physical/impl/SingleSenderCreator.java | 26 ------ .../exec/physical/impl/TopN/TopNBatch.java | 78 +++++++++++------- .../exec/physical/impl/WriterRecordBatch.java | 19 ++--- .../physical/impl/aggregate/HashAggBatch.java | 50 ++++++------ .../impl/aggregate/StreamingAggBatch.java | 43 +++++----- .../BroadcastSenderRootExec.java | 28 ------- .../physical/impl/filter/FilterRecordBatch.java | 4 - .../impl/flatten/FlattenRecordBatch.java | 33 -------- .../exec/physical/impl/join/HashJoinBatch.java | 51 ++++++------ .../exec/physical/impl/join/JoinStatus.java | 29 +++++-- .../exec/physical/impl/join/MergeJoinBatch.java | 74 ++++++++--------- .../physical/impl/limit/LimitRecordBatch.java | 18 +++-- .../impl/mergereceiver/MergingRecordBatch.java | 57 +++++++++----- .../PartitionSenderRootExec.java | 24 +++--- .../partitionsender/PartitionerTemplate.java | 1 + .../impl/producer/ProducerConsumerBatch.java | 25 ------ .../impl/project/ProjectRecordBatch.java | 39 +++++---- .../impl/svremover/RemovingRecordBatch.java | 10 +-- .../impl/union/UnionAllRecordBatch.java | 22 ++---- .../UnorderedReceiverBatch.java | 10 +-- .../IteratorValidatorBatchIterator.java | 6 -- .../window/StreamingWindowFrameRecordBatch.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 58 ++++++++------ .../drill/exec/record/AbstractRecordBatch.java | 59 +++++++++++--- .../exec/record/AbstractSingleRecordBatch.java | 35 +++++---- .../apache/drill/exec/record/RecordBatch.java | 8 -- .../org/apache/drill/exec/work/WorkManager.java | 37 +++++---- .../work/batch/UnlimitedRawBatchBuffer.java | 34 ++++++-- .../exec/work/fragment/FragmentExecutor.java | 1 - .../exec/nested/TestFastComplexSchema.java | 83 ++++++++++++++++++++ .../exec/physical/impl/SimpleRootExec.java | 14 ---- .../exec/physical/impl/TestCastFunctions.java | 2 +- .../drill/exec/physical/impl/TestDecimal.java | 10 +-- .../physical/impl/TestExtractFunctions.java | 2 +- .../physical/impl/TestSimpleFragmentRun.java | 2 +- .../exec/physical/impl/join/TestMergeJoin.java | 1 + .../physical/impl/window/TestWindowFrame.java | 8 +- .../exec/physical/impl/writer/TestWriter.java | 2 +- .../drill/exec/record/vector/TestDateTypes.java | 2 +- .../drill/exec/sql/TestWindowFunctions.java | 2 + .../fn/TestJsonReaderWithSparseFiles.java | 2 +- .../vector/complex/writer/TestJsonReader.java | 8 +- 45 files changed, 537 insertions(+), 556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index d9c4e5b..a644c34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -29,11 +29,6 @@ public interface RootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class); /** - * Generate and send emtpy schema batch - */ - public void buildSchema() throws SchemaChangeException; - - /** * Do the next batch of work. * @return Whether or not additional batches of work are necessary. False means that this fragment is done. */ http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 4ed1180..6e1f139 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -80,7 +80,7 @@ public class ScanBatch implements RecordBatch { private List partitionVectors; private List selectedPartitionColumns; private String partitionColumnDesignator; - private boolean first = false; + private boolean first = true; private boolean done = false; private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); @@ -121,32 +121,12 @@ public class ScanBatch implements RecordBatch { } @Override - public IterOutcome buildSchema() { - IterOutcome outcome = next(); - if (outcome == IterOutcome.NONE) { - container.buildSchema(SelectionVectorMode.NONE); - schema = container.getSchema(); - done = true; - } - first = true; - tempContainer = VectorContainer.getTransferClone(container); - for (VectorWrapper w : container) { - w.getValueVector().allocateNew(); - } - return IterOutcome.OK_NEW_SCHEMA; - } - - @Override public int getRecordCount() { return recordCount; } @Override public void kill(boolean sendUpstream) { - if (currentReader != null) { - currentReader.cleanup(); - } - if (sendUpstream) { done = true; } else { @@ -158,24 +138,11 @@ public class ScanBatch implements RecordBatch { container.zeroVectors(); } - private void transfer() { - container.zeroVectors(); - for (VectorWrapper w : tempContainer) { - MaterializedField field = w.getField(); - w.getValueVector().makeTransferPair(container.addOrGet(field)).transfer(); - } - } - @Override public IterOutcome next() { if (done) { return IterOutcome.NONE; } - if (first) { - first = false; - transfer(); - return IterOutcome.OK; - } long t1 = System.nanoTime(); oContext.getStats().startProcessing(); try { @@ -193,6 +160,7 @@ public class ScanBatch implements RecordBatch { if (!readers.hasNext()) { currentReader.cleanup(); releaseAssets(); + done = true; return IterOutcome.NONE; } oContext.getStats().startSetup(); @@ -226,14 +194,8 @@ public class ScanBatch implements RecordBatch { if (mutator.isNewSchema()) { container.buildSchema(SelectionVectorMode.NONE); schema = container.getSchema(); - long t2 = System.nanoTime(); -// System.out.println((t2 - t1) / recordCount); -// BatchPrinter.printBatch(this, "\t"); return IterOutcome.OK_NEW_SCHEMA; } else { - long t2 = System.nanoTime(); -// System.out.println((t2 - t1) / recordCount); -// BatchPrinter.printBatch(this, "\t"); return IterOutcome.OK; } } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 3a843ea..41f6349 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -87,31 +87,6 @@ public class ScreenCreator implements RootCreator{ } @Override - public void buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - - QueryWritableBatch batch = QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, false, incoming.getSchema()); - stats.startWait(); - try { - connection.sendResult(listener, batch); - } finally { - stats.stopWait(); - } - sendCount.increment(); - } finally { - stats.stopProcessing(); - } - materializer = new VectorRecordMaterializer(context, incoming); - } - - @Override public boolean innerNext() { if(!ok){ stop(); @@ -129,7 +104,7 @@ public class ScreenCreator implements RootCreator{ .setQueryId(context.getHandle().getQueryId()) // .setRowCount(0) // .setQueryState(QueryState.FAILED) - .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopeed.", + .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopped.", context.getFailureCause(), logger, verbose)) .setDef(RecordBatchDef.getDefaultInstance()) // .setIsLastChunk(true) // http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index b638de0..bd4ccd0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -85,32 +85,6 @@ public class SingleSenderCreator implements RootCreator{ } @Override - public void buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - - FragmentWritableBatch batch = FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(), - handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), 0, incoming.getSchema()); - - stats.startWait(); - try { - tunnel.sendRecordBatch(new RecordSendFailure(), batch); - } finally { - stats.stopWait(); - } - sendCount.increment(); - } finally { - stats.stopProcessing(); - } - } - - @Override public boolean innerNext() { if (!ok) { incoming.kill(false); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 400a867..a67f835 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -86,6 +86,10 @@ public class TopNBatch extends AbstractRecordBatch { private long countSincePurge; private int batchCount; private Copier copier; + private boolean schemaBuilt = false; + private boolean first = true; + private int recordCount = 0; + private boolean stop; public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); @@ -96,7 +100,7 @@ public class TopNBatch extends AbstractRecordBatch { @Override public int getRecordCount() { - return sv4.getCount(); + return recordCount; } @Override @@ -121,38 +125,43 @@ public class TopNBatch extends AbstractRecordBatch { incoming.cleanup(); } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { + public void buildSchema() throws SchemaChangeException { VectorContainer c = new VectorContainer(oContext); - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - for (VectorWrapper w : incoming) { - c.addOrGet(w.getField()); - } - c = VectorContainer.canonicalize(c); - for (VectorWrapper w : c) { - container.add(w.getValueVector()); - } - container.buildSchema(SelectionVectorMode.NONE); - container.setRecordCount(0); - return IterOutcome.OK_NEW_SCHEMA; - } finally { - stats.stopProcessing(); + IterOutcome outcome = next(incoming); + switch (outcome) { + case OK: + case OK_NEW_SCHEMA: + for (VectorWrapper w : incoming) { + c.addOrGet(w.getField()); + } + c = VectorContainer.canonicalize(c); + for (VectorWrapper w : c) { + ValueVector v = container.addOrGet(w.getField()); + v.allocateNew(); + } + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(0); + return; + case STOP: + stop = true; + case NONE: + state = BatchState.DONE; + default: + return; } } @Override public IterOutcome innerNext() { + if (state == BatchState.DONE) { + return IterOutcome.NONE; + } if (schema != null) { if (getSelectionVector4().next()) { + recordCount = sv4.getCount(); return IterOutcome.OK; } else { + recordCount = 0; return IterOutcome.NONE; } } @@ -162,7 +171,13 @@ public class TopNBatch extends AbstractRecordBatch { outer: while (true) { Stopwatch watch = new Stopwatch(); watch.start(); - IterOutcome upstream = incoming.next(); + IterOutcome upstream; + if (first) { + upstream = IterOutcome.OK_NEW_SCHEMA; + first = false; + } else { + upstream = next(incoming); + } if (upstream == IterOutcome.OK && schema == null) { upstream = IterOutcome.OK_NEW_SCHEMA; container.clear(); @@ -185,6 +200,12 @@ public class TopNBatch extends AbstractRecordBatch { } // fall through. case OK: + if (incoming.getRecordCount() == 0) { + for (VectorWrapper w : incoming) { + w.clear(); + } + break; + } countSincePurge += incoming.getRecordCount(); batchCount++; RecordBatchData batch = new RecordBatchData(incoming); @@ -204,8 +225,9 @@ public class TopNBatch extends AbstractRecordBatch { } } - if (schema == null) { + if (schema == null || priorityQueue == null) { // builder may be null at this point if the first incoming batch is empty + state = BatchState.DONE; return IterOutcome.NONE; } @@ -218,6 +240,7 @@ public class TopNBatch extends AbstractRecordBatch { } container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); + recordCount = sv4.getCount(); return IterOutcome.OK_NEW_SCHEMA; } catch(SchemaChangeException | ClassTransformationException | IOException ex) { @@ -345,11 +368,6 @@ public class TopNBatch extends AbstractRecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - return null; - } - - @Override public int getRecordCount() { if (sv4 != null) { return sv4.getCount(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index cb0de02..74a674e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -54,7 +54,7 @@ public class WriterRecordBatch extends AbstractRecordBatch { private BatchSchema schema; public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException { - super(writer, context); + super(writer, context, false); this.incoming = incoming; FragmentHandle handle = context.getHandle(); @@ -78,17 +78,12 @@ public class WriterRecordBatch extends AbstractRecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - incoming.buildSchema(); - try { - stats.startProcessing(); - setupNewSchema(); - } catch (Exception e) { - throw new SchemaChangeException(e); - } finally { - stats.stopProcessing(); - } - return IterOutcome.OK_NEW_SCHEMA; + public void buildSchema() throws SchemaChangeException { +// try { +// setupNewSchema(); +// } catch (Exception e) { +// throw new SchemaChangeException(e); +// } } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index a0b8d3f..113e883 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -46,6 +46,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; @@ -58,11 +59,9 @@ public class HashAggBatch extends AbstractRecordBatch { private HashAggregator aggregator; private final RecordBatch incoming; - private boolean done = false; private LogicalExpression[] aggrExprs; private TypedFieldId[] groupByOutFieldIds ; private TypedFieldId[] aggrOutFieldIds ; // field ids for the outgoing batch - private boolean first = true; private final GeneratorMapping UPDATE_AGGR_INSIDE = GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */, @@ -82,44 +81,41 @@ public class HashAggBatch extends AbstractRecordBatch { @Override public int getRecordCount() { - if (done) { + if (state == BatchState.DONE) { return 0; } return aggregator.getOutputCount(); } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - if (!createAggregator()) { - done = true; - return IterOutcome.STOP; - } - return IterOutcome.OK_NEW_SCHEMA; - } finally { - stats.stopProcessing(); + public void buildSchema() throws SchemaChangeException { + if (next(incoming) == IterOutcome.NONE) { + state = BatchState.DONE; + container.buildSchema(SelectionVectorMode.NONE); + return; + } + if (!createAggregator()) { + state = BatchState.DONE; + } + for (VectorWrapper w : container) { + w.getValueVector().allocateNew(); } } @Override public IterOutcome innerNext() { - if (done) { - return IterOutcome.NONE; - } // this is only called on the first batch. Beyond this, the aggregator manages batches. - if (aggregator == null || first) { - first = false; + if (aggregator == null || state == BatchState.FIRST) { if (aggregator != null) { aggregator.cleanup(); } - IterOutcome outcome = next(incoming); + IterOutcome outcome; + if (state == BatchState.FIRST) { + state = BatchState.NOT_FIRST; + outcome = IterOutcome.OK; + } else { + outcome = next(incoming); + } if (outcome == IterOutcome.OK) { outcome = IterOutcome.OK_NEW_SCHEMA; } @@ -133,7 +129,7 @@ public class HashAggBatch extends AbstractRecordBatch { return outcome; case OK_NEW_SCHEMA: if (!createAggregator()) { - done = true; + state = BatchState.DONE; return IterOutcome.STOP; } break; @@ -163,7 +159,7 @@ public class HashAggBatch extends AbstractRecordBatch { case CLEANUP_AND_RETURN: container.zeroVectors(); aggregator.cleanup(); - done = true; + state = BatchState.DONE; // fall through case RETURN_OUTCOME: IterOutcome outcome = aggregator.getOutcome(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 17aaae8..ef85a36 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -45,6 +45,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; @@ -59,6 +60,7 @@ public class StreamingAggBatch extends AbstractRecordBatch { private final RecordBatch incoming; private boolean done = false; private boolean first = true; + private boolean schemaBuilt = false; public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(popConfig, context); @@ -77,34 +79,31 @@ public class StreamingAggBatch extends AbstractRecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - if (!createAggregator()) { - done = true; - return IterOutcome.STOP; - } - return IterOutcome.OK_NEW_SCHEMA; - } finally { - stats.stopProcessing(); + public void buildSchema() throws SchemaChangeException { + if (next(incoming) == IterOutcome.NONE) { + state = BatchState.DONE; + container.buildSchema(SelectionVectorMode.NONE); + return; + } + if (!createAggregator()) { + state = BatchState.DONE; + } + for (VectorWrapper w : container) { + w.getValueVector().allocateNew(); } } + @Override public IterOutcome innerNext() { - if (done) { - container.zeroVectors(); - return IterOutcome.NONE; - } // this is only called on the first batch. Beyond this, the aggregator manages batches. if (aggregator == null || first) { - first = false; - IterOutcome outcome = next(incoming); + IterOutcome outcome; + if (first && incoming.getRecordCount() > 0) { + first = false; + outcome = IterOutcome.OK_NEW_SCHEMA; + } else { + outcome = next(incoming); + } logger.debug("Next outcome of {}", outcome); switch (outcome) { case NONE: http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 4e7d222..22fa047 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -85,34 +85,6 @@ public class BroadcastSenderRootExec extends BaseRootExec { } @Override - public void buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - - FragmentWritableBatch batch = FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(), - handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), 0, incoming.getSchema()); - - stats.startWait(); - for (int i = 0; i < tunnels.length; i++) { - try { - tunnels[i].sendRecordBatch(this.statusHandler, batch); - } finally { - stats.stopWait(); - } - statusHandler.sendCount.increment(); - } - } finally { - stats.stopProcessing(); - } - } - - @Override public boolean innerNext() { if(!ok) { context.fail(statusHandler.ex); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 7d68e07..064d5c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -80,10 +80,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch{ container.zeroVectors(); int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); -// for (VectorWrapper v : container) { -// ValueVector.Mutator m = v.getValueVector().getMutator(); -// m.setValueCount(recordCount); -// } return IterOutcome.OK; } http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 8d14d2e..e82dd29 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -252,39 +252,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { return ref; } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { - incoming.buildSchema(); - if ( ! fastSchemaCalled ) { - for (VectorWrapper vw : incoming) { - if (vw.getField().getPath().equals(popConfig.getColumn())) { - if (vw.getValueVector() instanceof MapVector) { - // fast schema upstream did not report a repeated type - // assume it will be repeated in the actual results and it will fail in execution if it is not - container.addOrGet(vw.getField()); - } else if (! (vw.getValueVector() instanceof RepeatedVector )) { - container.addOrGet(vw.getField()); - } else { - TransferPair pair = getFlattenFieldTransferPair(); - if (pair == null) { - continue; - } - container.add(pair.getTo()); - } - } else { - container.addOrGet(vw.getField()); - } - } - fastSchemaCalled = true; - container.buildSchema(SelectionVectorMode.NONE); - return IterOutcome.OK_NEW_SCHEMA; - } - else { - setupNewSchema(); - return IterOutcome.OK_NEW_SCHEMA; - } - } - /** * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 7f4d03c..7d2557e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -103,9 +103,7 @@ public class HashJoinBatch extends AbstractRecordBatch { // Schema of the build side private BatchSchema rightSchema = null; - private boolean first = true; - private boolean done = false; // Generator mapping for the build side // Generator mapping for the build side : scalar @@ -143,10 +141,11 @@ public class HashJoinBatch extends AbstractRecordBatch { boolean firstOutputBatch = true; IterOutcome leftUpstream = IterOutcome.NONE; + IterOutcome rightUpstream = IterOutcome.NONE; private final HashTableStats htStats = new HashTableStats(); - public enum Metric implements MetricDef { + public enum Metric implements MetricDef { NUM_BUCKETS, NUM_ENTRIES, @@ -169,9 +168,9 @@ public class HashJoinBatch extends AbstractRecordBatch { @Override - public IterOutcome buildSchema() throws SchemaChangeException { - leftUpstream = left.buildSchema(); - right.buildSchema(); + protected void buildSchema() throws SchemaChangeException { + leftUpstream = next(left); + rightUpstream = next(right); // Initialize the hash join helper context hjHelper = new HashJoinHelper(context, oContext.getAllocator()); try { @@ -188,29 +187,27 @@ public class HashJoinBatch extends AbstractRecordBatch { setupHashTable(); hashJoinProbe = setupHashJoinProbe(); // Build the container schema and set the counts + for (VectorWrapper w : container) { + w.getValueVector().allocateNew(); + } container.buildSchema(BatchSchema.SelectionVectorMode.NONE); container.setRecordCount(outputRecords); } catch (IOException | ClassTransformationException e) { throw new SchemaChangeException(e); } - return IterOutcome.OK_NEW_SCHEMA; } @Override public IterOutcome innerNext() { - if (done) { - return IterOutcome.NONE; - } try { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side */ - if (first) { - first = false; + if (state == BatchState.FIRST) { // Build the hash table, using the build side record batches. executeBuildPhase(); // IterOutcome next = next(HashJoinHelper.LEFT_INPUT, left); - hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 0, this, hashTable, hjHelper, joinType); + hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, left.getRecordCount(), this, hashTable, hjHelper, joinType); // Update the hash table related stats for the operator updateStats(this.hashTable); @@ -230,8 +227,10 @@ public class HashJoinBatch extends AbstractRecordBatch { * 2. We've filled up the outgoing batch to the maximum and we need to return upstream * Either case build the output container's schema and return */ - if (outputRecords > 0 || first) { - first = false; + if (outputRecords > 0 || state == BatchState.FIRST) { + if (state == BatchState.FIRST) { + state = BatchState.NOT_FIRST; + } for (VectorWrapper v : container) { @@ -258,10 +257,10 @@ public class HashJoinBatch extends AbstractRecordBatch { } // No more output records, clean up and return - done = true; - if (first) { - return IterOutcome.OK_NEW_SCHEMA; - } + state = BatchState.DONE; +// if (first) { +// return IterOutcome.OK_NEW_SCHEMA; +// } return IterOutcome.NONE; } catch (ClassTransformationException | SchemaChangeException | IOException e) { context.fail(e); @@ -308,10 +307,14 @@ public class HashJoinBatch extends AbstractRecordBatch { public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { //Setup the underlying hash table - IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); - if (hashTable == null) { - rightUpstream = IterOutcome.OK_NEW_SCHEMA; - } + + // skip first batch if count is zero, as it may be an empty schema batch + if (right.getRecordCount() == 0) { + for (VectorWrapper w : right) { + w.clear(); + } + rightUpstream = next(right); + } boolean moreData = true; @@ -475,7 +478,7 @@ public class HashJoinBatch extends AbstractRecordBatch { } public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { - super(popConfig, context); + super(popConfig, context, true); this.left = left; this.right = right; this.joinType = popConfig.getJoinType(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 3bc8daa..a7fa5aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -35,6 +35,12 @@ public final class JoinStatus { INCOMING, SV4; } + private static enum InitState { + INIT, // initial state + CHECK, // need to check if batches are empty + READY // read to do work + } + private static final int LEFT_INPUT = 0; private static final int RIGHT_INPUT = 1; @@ -55,7 +61,7 @@ public final class JoinStatus { private final JoinRelType joinType; public boolean ok = true; - private boolean initialSet = false; + private InitState initialSet = InitState.INIT; private boolean leftRepeating = false; public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) { @@ -75,10 +81,23 @@ public final class JoinStatus { } public final void ensureInitial() { - if(!initialSet) { - this.lastLeft = nextLeft(); - this.lastRight = nextRight(); - initialSet = true; + switch(initialSet) { + case INIT: + this.lastLeft = nextLeft(); + this.lastRight = nextRight(); + initialSet = InitState.CHECK; + break; + case CHECK: + if (lastLeft != IterOutcome.NONE && left.getRecordCount() == 0) { + this.lastLeft = nextLeft(); + } + if (lastRight != IterOutcome.NONE && right.getRecordCount() == 0) { + this.lastRight = nextRight(); + } + initialSet = InitState.READY; + // fall through + default: + break; } } http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 518971d..87b12b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -111,10 +111,9 @@ public class MergeJoinBatch extends AbstractRecordBatch { private final JoinRelType joinType; private JoinWorker worker; public MergeJoinBatchBuilder batchBuilder; - private boolean done = false; protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { - super(popConfig, context); + super(popConfig, context, true); if (popConfig.getConditions().size() == 0) { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); @@ -136,24 +135,13 @@ public class MergeJoinBatch extends AbstractRecordBatch { return status.getOutPosition(); } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { - left.buildSchema(); - right.buildSchema(); - try { - allocateBatch(true); - worker = generateNewWorker(); - } catch (IOException | ClassTransformationException e) { - throw new SchemaChangeException(e); - } - return IterOutcome.OK_NEW_SCHEMA; + public void buildSchema() throws SchemaChangeException { + status.ensureInitial(); + allocateBatch(true); } @Override public IterOutcome innerNext() { - if (done) { - return IterOutcome.NONE; - } // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. status.ensureInitial(); @@ -214,7 +202,7 @@ public class MergeJoinBatch extends AbstractRecordBatch { case NO_MORE_DATA: logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE"))); setRecordCountInContainer(); - done = true; + state = BatchState.DONE; return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE); case SCHEMA_CHANGED: worker = null; @@ -437,35 +425,37 @@ public class MergeJoinBatch extends AbstractRecordBatch { // allocate new batch space. container.zeroVectors(); - //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE. - int leftCount = worker == null ? left.getRecordCount() : (status.isLeftPositionAllowed() ? left.getRecordCount() : 0); - int rightCount = worker == null ? left.getRecordCount() : (status.isRightPositionAllowed() ? right.getRecordCount() : 0); - int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE); + boolean leftAllowed = status.getLastLeft() != IterOutcome.NONE; + boolean rightAllowed = status.getLastRight() != IterOutcome.NONE; if (newSchema) { // add fields from both batches - for (VectorWrapper w : left) { - MajorType inputType = w.getField().getType(); - MajorType outputType; - if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { - outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); - } else { - outputType = inputType; + if (leftAllowed) { + for (VectorWrapper w : left) { + MajorType inputType = w.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } + MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType); + container.addOrGet(newField); } - MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType); - container.addOrGet(newField); } - for (VectorWrapper w : right) { - MajorType inputType = w.getField().getType(); - MajorType outputType; - if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { - outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); - } else { - outputType = inputType; + if (rightAllowed) { + for (VectorWrapper w : right) { + MajorType inputType = w.getField().getType(); + MajorType outputType; + if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); + } else { + outputType = inputType; + } + MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType); + container.addOrGet(newField); } - MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType); - container.addOrGet(newField); } } @@ -489,7 +479,8 @@ public class MergeJoinBatch extends AbstractRecordBatch { // materialize value vector readers from join expression LogicalExpression materializedLeftExpr; - if (worker == null || status.isLeftPositionAllowed()) { + if (status.getLastLeft() != IterOutcome.NONE) { +// if (status.isLeftPositionAllowed()) { materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); } else { materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT)); @@ -500,7 +491,8 @@ public class MergeJoinBatch extends AbstractRecordBatch { } LogicalExpression materializedRightExpr; - if (worker == null || status.isRightPositionAllowed()) { +// if (status.isRightPositionAllowed()) { + if (status.getLastRight() != IterOutcome.NONE) { materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry()); } else { materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT)); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 02e1a92..7e66893 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -43,6 +43,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch { private boolean noEndLimit; private boolean skipBatch; private boolean done = false; + private boolean first = true; List transfers = Lists.newArrayList(); public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { @@ -93,10 +94,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch { return IterOutcome.NONE; } - if(!noEndLimit && recordsLeft <= 0) { + if(!first && !noEndLimit && recordsLeft <= 0) { incoming.kill(true); - IterOutcome upStream = incoming.next(); + IterOutcome upStream = next(incoming); while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { @@ -104,7 +105,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch { for (VectorWrapper wrapper : incoming) { wrapper.getValueVector().clear(); } - upStream = incoming.next(); + upStream = next(incoming); } return IterOutcome.NONE; @@ -119,11 +120,18 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch { @Override protected IterOutcome doWork() { - for(TransferPair tp : transfers) { - tp.transfer(); + if (first) { + first = false; } skipBatch = false; int recordCount = incoming.getRecordCount(); + if (recordCount == 0) { + skipBatch = true; + return IterOutcome.OK; + } + for(TransferPair tp : transfers) { + tp.transfer(); + } if(recordCount <= recordsToSkip) { recordsToSkip -= recordCount; skipBatch = true; http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 8da8f96..acbb755 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -107,7 +108,7 @@ public class MergingRecordBatch extends AbstractRecordBatch private int[] batchOffsets; private PriorityQueue pqueue; private RawFragmentBatch emptyBatch = null; - private boolean done = false; + private RawFragmentBatch[] tempBatchHolder; // public static enum Metric implements MetricDef{ BYTES_RECEIVED, @@ -123,7 +124,7 @@ public class MergingRecordBatch extends AbstractRecordBatch public MergingRecordBatch(FragmentContext context, MergingReceiverPOP config, RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { - super(config, context, new OperatorContext(config, context, false)); + super(config, context, true, new OperatorContext(config, context, false)); //super(config, context); this.fragProviders = fragProviders; this.context = context; @@ -151,9 +152,6 @@ public class MergingRecordBatch extends AbstractRecordBatch if (fragProviders.length == 0) { return IterOutcome.NONE; } - if (done) { - return IterOutcome.NONE; - } boolean schemaChanged = false; if (prevBatchWasFull) { @@ -175,11 +173,18 @@ public class MergingRecordBatch extends AbstractRecordBatch // set up each (non-empty) incoming record batch List rawBatches = Lists.newArrayList(); - boolean firstBatch = true; + int p = 0; for (RawFragmentBatchProvider provider : fragProviders) { RawFragmentBatch rawBatch = null; try { - rawBatch = getNext(provider); + // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema + if (tempBatchHolder[p] != null) { + rawBatch = tempBatchHolder[p]; + tempBatchHolder[p] = null; + } else { + rawBatch = getNext(provider); + } + p++; if (rawBatch == null && context.isCancelled()) { return IterOutcome.STOP; } @@ -190,7 +195,8 @@ public class MergingRecordBatch extends AbstractRecordBatch if (rawBatch.getHeader().getDef().getRecordCount() != 0) { rawBatches.add(rawBatch); } else { - if (emptyBatch == null) { + // save an empty batch to use for schema purposes. ignore batch if it contains no fields, and thus no schema + if (emptyBatch == null && rawBatch.getHeader().getDef().getFieldCount() != 0) { emptyBatch = rawBatch; } try { @@ -406,9 +412,7 @@ public class MergingRecordBatch extends AbstractRecordBatch } if (pqueue.isEmpty()) { - if (!done) { - done = !done; - } + state = BatchState.DONE; } if (schemaChanged) { @@ -429,22 +433,33 @@ public class MergingRecordBatch extends AbstractRecordBatch return outgoingContainer.getSchema(); } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { - stats.startProcessing(); + public void buildSchema() { + // find frag provider that has data to use to build schema, and put in tempBatchHolder for later use + tempBatchHolder = new RawFragmentBatch[fragProviders.length]; + int i = 0; try { - RawFragmentBatch batch = getNext(fragProviders[0]); - for (SerializedField field : batch.getHeader().getDef().getFieldList()) { - outgoingContainer.addOrGet(MaterializedField.create(field)); + while (true) { + if (i >= fragProviders.length) { + state = BatchState.DONE; + return; + } + RawFragmentBatch batch = getNext(fragProviders[i]); + if (batch.getHeader().getDef().getFieldCount() == 0) { + i++; + continue; + } + tempBatchHolder[i] = batch; + for (SerializedField field : batch.getHeader().getDef().getFieldList()) { + ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field)); + v.allocateNew(); + } + break; } } catch (IOException e) { - throw new SchemaChangeException(e); - } finally { - stats.stopProcessing(); + throw new DrillRuntimeException(e); } outgoingContainer = VectorContainer.canonicalize(outgoingContainer); outgoingContainer.buildSchema(SelectionVectorMode.NONE); - return IterOutcome.OK_NEW_SCHEMA; } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 9e3cfe5..f09acaa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -110,25 +110,17 @@ public class PartitionSenderRootExec extends BaseRootExec { return true; } - @Override - public void buildSchema() throws SchemaChangeException { - incoming.buildSchema(); - stats.startProcessing(); + private void buildSchema() throws SchemaChangeException { + createPartitioner(); try { - createPartitioner(); - try { - partitioner.flushOutgoingBatches(false, true); - } catch (IOException e) { - throw new SchemaChangeException(e); - } - } finally { - stats.stopProcessing(); + partitioner.flushOutgoingBatches(false, true); + } catch (IOException e) { + throw new SchemaChangeException(e); } } @Override public boolean innerNext() { - boolean newSchema = false; if (!ok) { stop(); @@ -146,7 +138,6 @@ public class PartitionSenderRootExec extends BaseRootExec { logger.debug("Partitioner.next(): got next record batch with status {}", out); if (first && out == IterOutcome.OK) { - first = false; out = IterOutcome.OK_NEW_SCHEMA; } switch(out){ @@ -179,6 +170,11 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.clear(); } createPartitioner(); + // flush to send schema downstream + if (first) { + first = false; + partitioner.flushOutgoingBatches(false, true); + } } catch (IOException e) { incoming.kill(false); logger.error("Error while flushing outgoing batches", e); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index a16e29f..20f6195 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -364,6 +364,7 @@ public abstract class PartitionerTemplate implements Partitioner { // allocate a new value vector ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); + v.getValueVector().makeTransferPair(outgoingVector); outgoingVector.allocateNew(); vectorContainer.add(outgoingVector); } http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 132c41e..4c9b33b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -58,31 +58,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - stats.startSetup(); - try { - for (VectorWrapper w : incoming) { - container.addOrGet(w.getField()); - } - } finally { - stats.stopSetup(); - } - } finally { - stats.stopProcessing(); - } - container.buildSchema(incoming.getSchema().getSelectionVectorMode()); - return IterOutcome.OK_NEW_SCHEMA; - } - - @Override public IterOutcome innerNext() { if (!running) { producer.start(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 27cb1f2..f822e55 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -17,7 +17,9 @@ */ package org.apache.drill.exec.physical.impl.project; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.HashMap; import java.util.List; @@ -61,8 +63,8 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; @@ -83,6 +85,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { private boolean buildingSchema = true; private static final String EMPTY_STRING = ""; + private boolean first = true; private class ClassifierResult { public boolean isStar = false; @@ -136,9 +139,29 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { @Override protected IterOutcome doWork() { -// VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); + if (first && incomingRecordCount == 0) { + if (complexWriters != null) { + IterOutcome next = null; + while (incomingRecordCount == 0) { + next = next(incoming); + if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA) { + return next; + } + incomingRecordCount = incoming.getRecordCount(); + } + if (next == IterOutcome.OK_NEW_SCHEMA) { + try { + setupNewSchema(); + } catch (SchemaChangeException e) { + throw new RuntimeException(e); + } + } + } + } + first = false; + container.zeroVectors(); if (!doAlloc()) { @@ -265,13 +288,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - incoming.buildSchema(); - setupNewSchema(); - return IterOutcome.OK_NEW_SCHEMA; - } - - @Override protected boolean setupNewSchema() throws SchemaChangeException { if (allocationVectors != null) { for (ValueVector v : allocationVectors) { @@ -400,11 +416,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); cg.addExpr(expr); - if (buildingSchema) { - buildingSchema = false; - MaterializedField f = MaterializedField.create(outputField.getPath().getAsUnescapedPath(), Types.required(MinorType.MAP)); - container.addOrGet(f); - } } else{ // need to do evaluation. ValueVector vector = container.addOrGet(outputField, callBack); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index f2c1e89..1fa759c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -17,7 +17,9 @@ */ package org.apache.drill.exec.physical.impl.svremover; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.List; import org.apache.drill.exec.exception.ClassTransformationException; @@ -34,6 +36,7 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; @@ -60,13 +63,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch { private int outRecordCount; public UnionAllRecordBatch(UnionAll config, List children, FragmentContext context) throws OutOfMemoryException { - super(config, context); + super(config, context, false); this.incoming = children; this.incomingIterator = incoming.iterator(); current = incomingIterator.next(); @@ -78,11 +78,8 @@ public class UnionAllRecordBatch extends AbstractRecordBatch { } } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { - incoming.get(0).buildSchema(); + public void buildSchema() throws SchemaChangeException { setupSchema(); - return IterOutcome.OK_NEW_SCHEMA; } @Override @@ -103,11 +100,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch { return IterOutcome.NONE; } current = incomingIterator.next(); - try { - current.buildSchema(); - } catch (SchemaChangeException e) { - throw new RuntimeException(e); - } upstream = current.next(); if (upstream == IterOutcome.OK) { upstream = IterOutcome.OK_NEW_SCHEMA; @@ -132,18 +124,16 @@ public class UnionAllRecordBatch extends AbstractRecordBatch { private void doTransfer() { outRecordCount = current.getRecordCount(); + // skip empty batches + if (outRecordCount == 0) { + return; + } if (container.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { this.sv = current.getSelectionVector2(); } for (TransferPair transfer : transfers) { transfer.transfer(); } - -// for (VectorWrapper vw : this.container) { -// ValueVector.Mutator m = vw.getValueVector().getMutator(); -// m.setValueCount(outRecordCount); -// } - } private void setupSchema() { http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 25fec41..52b892e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -95,11 +95,6 @@ public class UnorderedReceiverBatch implements RecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - return next(); - } - - @Override public int getRecordCount() { return batchLoader.getRecordCount(); } @@ -108,9 +103,8 @@ public class UnorderedReceiverBatch implements RecordBatch { public void kill(boolean sendUpstream) { if (sendUpstream) { informSenders(); - } else { - fragProvider.kill(context); } + fragProvider.kill(context); } @Override @@ -148,7 +142,7 @@ public class UnorderedReceiverBatch implements RecordBatch { batch = fragProvider.getNext(); // skip over empty batches. we do this since these are basically control messages. - while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first) { + while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) { batch = fragProvider.getNext(); } } finally { http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 7f5ab2a..66ec22f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -76,12 +76,6 @@ public class IteratorValidatorBatchIterator implements RecordBatch { } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - state = incoming.buildSchema(); - return state; - } - - @Override public int getRecordCount() { validateReadState(); return incoming.getRecordCount(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java index e0e9d42..38c6884 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java @@ -252,7 +252,7 @@ public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch { private long totalSizeInMemory = 0; private long highWaterMark = Long.MAX_VALUE; private int targetRecordCount; + private boolean stop = false; public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { - super(popConfig, context); + super(popConfig, context, true); this.incoming = incoming; DrillConfig config = context.getConfig(); Configuration conf = new Configuration(); @@ -193,24 +194,24 @@ public class ExternalSortBatch extends AbstractRecordBatch { incoming.cleanup(); } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { - stats.startProcessing(); - try { - stats.stopProcessing(); - try { - incoming.buildSchema(); - } finally { - stats.startProcessing(); - } - for (VectorWrapper w : incoming) { - container.addOrGet(w.getField()); - } - container.buildSchema(SelectionVectorMode.NONE); - container.setRecordCount(0); - return IterOutcome.OK_NEW_SCHEMA; - } finally { - stats.stopProcessing(); + public void buildSchema() throws SchemaChangeException { + IterOutcome outcome = next(incoming); + switch (outcome) { + case OK: + case OK_NEW_SCHEMA: + for (VectorWrapper w : incoming) { + ValueVector v = container.addOrGet(w.getField()); + v.allocateNew(); + } + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(0); + return; + case STOP: + stop = true; + case NONE: + state = BatchState.DONE; + default: + return; } } @@ -228,7 +229,6 @@ public class ExternalSortBatch extends AbstractRecordBatch { } else { Stopwatch w = new Stopwatch(); w.start(); -// int count = selector.next(); int count = copier.next(targetRecordCount); if (count > 0) { long t = w.elapsed(TimeUnit.MICROSECONDS); @@ -249,7 +249,12 @@ public class ExternalSortBatch extends AbstractRecordBatch { outer: while (true) { Stopwatch watch = new Stopwatch(); watch.start(); - IterOutcome upstream = incoming.next(); + IterOutcome upstream; + if (first) { + upstream = IterOutcome.OK_NEW_SCHEMA; + } else { + upstream = next(incoming); + } if (upstream == IterOutcome.OK && sorter == null) { upstream = IterOutcome.OK_NEW_SCHEMA; } @@ -275,15 +280,15 @@ public class ExternalSortBatch extends AbstractRecordBatch { } // fall through. case OK: - if (!first && incoming.getRecordCount() == 0) { + if (first) { + first = false; + } + if (incoming.getRecordCount() == 0) { for (VectorWrapper w : incoming) { w.clear(); } break; } - if (first) { - first = false; - } totalSizeInMemory += getBufferSize(incoming); SelectionVector2 sv2; if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { @@ -336,6 +341,9 @@ public class ExternalSortBatch extends AbstractRecordBatch { } } + if (totalcount == 0) { + return IterOutcome.NONE; + } if (spillCount == 0) { Stopwatch watch = new Stopwatch(); watch.start(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index f77ae3d..2bb29e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.record; import java.util.Iterator; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; @@ -38,24 +39,36 @@ public abstract class AbstractRecordBatch implements protected final OperatorContext oContext; protected final OperatorStats stats; + protected BatchState state; + protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException { - super(); - this.context = context; - this.popConfig = popConfig; - this.oContext = new OperatorContext(popConfig, context, true); - this.stats = oContext.getStats(); - this.container = new VectorContainer(this.oContext); + this(popConfig, context, true, new OperatorContext(popConfig, context, true)); } - protected AbstractRecordBatch(T popConfig, FragmentContext context, OperatorContext oContext) throws OutOfMemoryException { + protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema) throws OutOfMemoryException { + this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true)); + } + + protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema, OperatorContext oContext) throws OutOfMemoryException { super(); this.context = context; this.popConfig = popConfig; this.oContext = oContext; this.stats = oContext.getStats(); this.container = new VectorContainer(this.oContext); + if (buildSchema) { + state = BatchState.BUILD_SCHEMA; + } else { + state = BatchState.FIRST; + } } + protected static enum BatchState { + BUILD_SCHEMA, // Need to build schema and return + FIRST, // This is still the first data batch + NOT_FIRST, // The first data batch has alread been returned + DONE // All work is done, no more data to be sent + } @Override public Iterator> iterator() { @@ -103,7 +116,33 @@ public abstract class AbstractRecordBatch implements public final IterOutcome next() { try { stats.startProcessing(); - return innerNext(); +// if (state == BatchState.BUILD_SCHEMA) { +// buildSchema(); +// if (state == BatchState.BUILD_SCHEMA.DONE) { +// return IterOutcome.NONE; +// } else { +// state = BatchState.FIRST; +// return IterOutcome.OK_NEW_SCHEMA; +// } +// } + switch (state) { + case BUILD_SCHEMA: { + buildSchema(); + if (state == BatchState.DONE) { + return IterOutcome.NONE; + } else { + state = BatchState.FIRST; + return IterOutcome.OK_NEW_SCHEMA; + } + } + case DONE: { + return IterOutcome.NONE; + } + default: + return innerNext(); + } + } catch (SchemaChangeException e) { + throw new DrillRuntimeException(e); } finally { stats.stopProcessing(); } @@ -116,9 +155,7 @@ public abstract class AbstractRecordBatch implements return container.getSchema(); } - @Override - public IterOutcome buildSchema() throws SchemaChangeException { - throw new UnsupportedOperationException("buildSchema() not yet implemented"); + protected void buildSchema() throws SchemaChangeException { } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 1ef0345..f895f47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -21,19 +21,18 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.SchemaChangeCallBack; public abstract class AbstractSingleRecordBatch extends AbstractRecordBatch { final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); protected final RecordBatch incoming; - private boolean first = true; - protected boolean done = false; protected boolean outOfMemory = false; protected SchemaChangeCallBack callBack = new SchemaChangeCallBack(); public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { - super(popConfig, context); + super(popConfig, context, false); this.incoming = incoming; } @@ -45,30 +44,35 @@ public abstract class AbstractSingleRecordBatch exte @Override public IterOutcome innerNext() { // Short circuit if record batch has already sent all data and is done - if (done) { + if (state == BatchState.DONE) { return IterOutcome.NONE; } IterOutcome upstream = next(incoming); - if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) { + if (state != BatchState.FIRST && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) { do { for (VectorWrapper w : incoming) { w.clear(); } } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0); } - if (first && upstream == IterOutcome.OK) { + if ((state == BatchState.FIRST) && upstream == IterOutcome.OK) { upstream = IterOutcome.OK_NEW_SCHEMA; } switch (upstream) { case NONE: case NOT_YET: case STOP: + if (state == BatchState.FIRST) { + container.buildSchema(SelectionVectorMode.NONE); + } return upstream; case OUT_OF_MEMORY: return upstream; case OK_NEW_SCHEMA: - first = false; + if (state == BatchState.FIRST) { + state = BatchState.NOT_FIRST; + } try { stats.startSetup(); if (!setupNewSchema()) { @@ -84,9 +88,15 @@ public abstract class AbstractSingleRecordBatch exte } // fall through. case OK: - assert !first : "First batch should be OK_NEW_SCHEMA"; + assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA"; container.zeroVectors(); - doWork(); + IterOutcome out = doWork(); + + // since doWork method does not know if there is a new schema, it will always return IterOutcome.OK if it was successful. + // But if upstream is IterOutcome.OK_NEW_SCHEMA, we should return that + if (out != IterOutcome.OK) { + upstream = out; + } if (outOfMemory) { outOfMemory = false; @@ -105,13 +115,6 @@ public abstract class AbstractSingleRecordBatch exte } @Override - public IterOutcome buildSchema() throws SchemaChangeException { - incoming.buildSchema(); - setupNewSchema(); - return IterOutcome.OK_NEW_SCHEMA; - } - - @Override public void cleanup() { // logger.debug("Cleaning up."); super.cleanup(); http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 318600f..0a8ece5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -68,14 +68,6 @@ public interface RecordBatch extends VectorAccessible { */ public BatchSchema getSchema(); - - /** - * To be called out the beginning of fragment execution. This will build the schema to return downstream, and to the client - * - * @return OK_NEW_SCHEMA if succesful. - */ - public IterOutcome buildSchema() throws SchemaChangeException; - /** * Provide the number of records that are within this record count * http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index d85abd5..5bc3da1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -96,22 +96,27 @@ public class WorkManager implements Closeable { // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-")); eventThread.start(); - dContext.getMetrics().register( - MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()), - new Gauge() { - @Override - public Integer getValue() { - return runningFragments.size(); - } - }); - dContext.getMetrics().register( - MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()), - new Gauge() { - @Override - public Integer getValue() { - return pendingTasks.size(); - } - }); + // TODO remove try block once metrics moved from singleton, For now catch to avoid unit test failures + try { + dContext.getMetrics().register( + MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()), + new Gauge() { + @Override + public Integer getValue() { + return runningFragments.size(); + } + }); + dContext.getMetrics().register( + MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()), + new Gauge() { + @Override + public Integer getValue() { + return pendingTasks.size(); + } + }); + } catch (IllegalArgumentException e) { + logger.warn("Exception while registering metrics", e); + } } public WorkEventBus getWorkBus() {