Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7035F18672 for ; Fri, 15 May 2015 00:20:16 +0000 (UTC) Received: (qmail 68369 invoked by uid 500); 15 May 2015 00:20:16 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 68317 invoked by uid 500); 15 May 2015 00:20:16 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 68299 invoked by uid 99); 15 May 2015 00:20:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 00:20:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 01699E116B; Fri, 15 May 2015 00:20:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smp@apache.org To: commits@drill.apache.org Date: Fri, 15 May 2015 00:20:15 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] drill git commit: DRILL-3088: Kill and cleanup remaing batches in left upstream in NestedLoopJoin Repository: drill Updated Branches: refs/heads/master f7f6efc52 -> 16ef62851 DRILL-3088: Kill and cleanup remaing batches in left upstream in NestedLoopJoin Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/80e3f74f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/80e3f74f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/80e3f74f Branch: refs/heads/master Commit: 80e3f74f3951d84c3209e6910440f2b0378cbd6f Parents: f7f6efc Author: Steven Phillips Authored: Thu May 14 15:44:52 2015 -0700 Committer: Steven Phillips Committed: Thu May 14 15:48:17 2015 -0700 ---------------------------------------------------------------------- .../physical/impl/join/NestedLoopJoinBatch.java | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/80e3f74f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index de0d8e5..9bcea60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.server.options.DrillConfigIterator.Iter; import org.apache.drill.exec.vector.AllocationHelper; import com.google.common.base.Preconditions; @@ -143,6 +144,8 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch // exit if we have an empty left batch if (leftUpstream == IterOutcome.NONE) { + // inform upstream that we don't need anymore data and make sure we clean up any batches already in queue + killAndDrainRight(); return IterOutcome.NONE; } @@ -193,6 +196,23 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE; } + private void killAndDrainRight() { + if (!hasMore(rightUpstream)) { + return; + } + right.kill(true); + while (hasMore(rightUpstream)) { + for (VectorWrapper wrapper : right) { + wrapper.getValueVector().clear(); + } + rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); + } + } + + private boolean hasMore(IterOutcome outcome) { + return outcome == IterOutcome.OK || outcome == IterOutcome.OK_NEW_SCHEMA; + } + /** * Method generates the runtime code needed for NLJ. Other than the setup method to set the input and output value * vector references we implement two more methods