Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1F421802F for ; Wed, 27 Jan 2016 20:37:52 +0000 (UTC) Received: (qmail 40473 invoked by uid 500); 27 Jan 2016 20:37:49 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 40440 invoked by uid 500); 27 Jan 2016 20:37:49 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 40431 invoked by uid 99); 27 Jan 2016 20:37:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jan 2016 20:37:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 604E1E00B2; Wed, 27 Jan 2016 20:37:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smp@apache.org To: commits@drill.apache.org Message-Id: <1b620176a042416a83e49b81cfdd7adc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-4196 Fix to stop returning no more data when output batch is full during merge. Date: Wed, 27 Jan 2016 20:37:49 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master 3d0b4b025 -> d70cf36ea DRILL-4196 Fix to stop returning no more data when output batch is full during merge. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d70cf36e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d70cf36e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d70cf36e Branch: refs/heads/master Commit: d70cf36ea3e108a2fe96bf6f1e2b71eace8b4adf Parents: 3d0b4b0 Author: Amit Hadke Authored: Tue Jan 26 16:52:25 2016 -0800 Committer: Steven Phillips Committed: Wed Jan 27 12:37:04 2016 -0800 ---------------------------------------------------------------------- .../exec/physical/impl/join/JoinStatus.java | 12 +++++++ .../exec/physical/impl/join/JoinTemplate.java | 5 ++- .../impl/join/TestMergeJoinAdvanced.java | 34 ++++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/d70cf36e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 9e31763..8e48515 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -42,6 +42,7 @@ public final class JoinStatus { private boolean allowMarking; public boolean ok = true; + public boolean hasMoreData = false; public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch output) { this.left = left; @@ -120,6 +121,14 @@ public final class JoinStatus { return allowMarking; } + public boolean isHasMoreData() { + return hasMoreData; + } + + public void setHasMoreData(boolean hasMoreData) { + this.hasMoreData = hasMoreData; + } + /** * Return state of join based on status of left and right iterator. * @return @@ -132,6 +141,9 @@ public final class JoinStatus { if (!ok) { return JoinOutcome.FAILURE; } + if (hasMoreData) { + return JoinOutcome.BATCH_RETURNED; + } if (bothMatches(IterOutcome.NONE) || (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || (joinType == JoinRelType.LEFT && getLeftStatus() == IterOutcome.NONE) || http://git-wip-us.apache.org/repos/asf/drill/blob/d70cf36e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index 43cbf71..37cf0ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -42,12 +42,13 @@ public abstract class JoinTemplate implements JoinWorker { */ public final boolean doJoin(final JoinStatus status) { final boolean isLeftJoin = (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT); - + status.setHasMoreData(false); while (!status.isOutgoingBatchFull()) { if (status.right.finished()) { if (isLeftJoin) { while (!status.left.finished()) { if (status.isOutgoingBatchFull()) { + status.setHasMoreData(true); return true; } doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition()); @@ -86,6 +87,7 @@ public abstract class JoinTemplate implements JoinWorker { if (status.isOutgoingBatchFull()) { // Leave iterators at their current positions and markers. // Don't mark on all subsequent doJoin iterations. + status.setHasMoreData(true); status.disableMarking(); return true; } @@ -97,6 +99,7 @@ public abstract class JoinTemplate implements JoinWorker { doCopyRight(status.right.getCurrentPosition(), status.getOutPosition()); status.incOutputPos(); if (status.isOutgoingBatchFull()) { + status.setHasMoreData(true); status.disableMarking(); return true; } http://git-wip-us.apache.org/repos/asf/drill/blob/d70cf36e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java index ac6ac89..3e0deb2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java @@ -213,4 +213,38 @@ public class TestMergeJoinAdvanced extends BaseTestQuery { .baselineValues(202452l) .go(); } + + @Test + public void testDrill4196() throws Exception { + final String leftSide = BaseTestQuery.getTempDir("merge-join-left.json"); + final String rightSide = BaseTestQuery.getTempDir("merge-join-right.json"); + final BufferedWriter leftWriter = new BufferedWriter(new FileWriter(new File(leftSide))); + final BufferedWriter rightWriter = new BufferedWriter(new FileWriter(new File(rightSide))); + + // output batch is 32k, create 60k left batch + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 9999, 9999)); + for (int i=0; i < 6000; ++i) { + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10000, 10000)); + } + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10001, 10001)); + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10002, 10002)); + + // Keep all values same. Jon will consume entire right side. + for (int i=0; i < 800; ++i) { + rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10000, 10000)); + } + + leftWriter.close(); + rightWriter.close(); + + final String query1 = String.format("select count(*) c1 from dfs_test.`%s` L %s join dfs_test.`%s` R on L.k=R.k1", + leftSide, "inner", rightSide); + testBuilder() + .sqlQuery(query1) + .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false") + .unOrdered() + .baselineColumns("c1") + .baselineValues(6000*800L) + .go(); + } }