drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject drill git commit: DRILL-4196 Fix to stop returning no more data when output batch is full during merge.
Date Wed, 27 Jan 2016 20:37:49 GMT
Repository: drill
Updated Branches:
  refs/heads/master 3d0b4b025 -> d70cf36ea


DRILL-4196 Fix to stop returning no more data when output batch is full during merge.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d70cf36e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d70cf36e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d70cf36e

Branch: refs/heads/master
Commit: d70cf36ea3e108a2fe96bf6f1e2b71eace8b4adf
Parents: 3d0b4b0
Author: Amit Hadke <amit.hadke@gmail.com>
Authored: Tue Jan 26 16:52:25 2016 -0800
Committer: Steven Phillips <steven@dremio.com>
Committed: Wed Jan 27 12:37:04 2016 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/JoinStatus.java     | 12 +++++++
 .../exec/physical/impl/join/JoinTemplate.java   |  5 ++-
 .../impl/join/TestMergeJoinAdvanced.java        | 34 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d70cf36e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 9e31763..8e48515 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -42,6 +42,7 @@ public final class JoinStatus {
   private boolean allowMarking;
 
   public boolean ok = true;
+  public boolean hasMoreData = false;
 
   public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch output) {
     this.left = left;
@@ -120,6 +121,14 @@ public final class JoinStatus {
     return allowMarking;
   }
 
+  public boolean isHasMoreData() {
+    return hasMoreData;
+  }
+
+  public void setHasMoreData(boolean hasMoreData) {
+    this.hasMoreData = hasMoreData;
+  }
+
   /**
    * Return state of join based on status of left and right iterator.
    * @return
@@ -132,6 +141,9 @@ public final class JoinStatus {
     if (!ok) {
       return JoinOutcome.FAILURE;
     }
+    if (hasMoreData) {
+      return JoinOutcome.BATCH_RETURNED;
+    }
     if (bothMatches(IterOutcome.NONE) ||
       (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) ||
       (joinType == JoinRelType.LEFT && getLeftStatus() == IterOutcome.NONE) ||

http://git-wip-us.apache.org/repos/asf/drill/blob/d70cf36e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index 43cbf71..37cf0ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -42,12 +42,13 @@ public abstract class JoinTemplate implements JoinWorker {
    */
   public final boolean doJoin(final JoinStatus status) {
     final boolean isLeftJoin = (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType()
== JoinRelType.LEFT);
-
+    status.setHasMoreData(false);
     while (!status.isOutgoingBatchFull()) {
       if (status.right.finished()) {
         if (isLeftJoin) {
           while (!status.left.finished()) {
             if (status.isOutgoingBatchFull()) {
+              status.setHasMoreData(true);
               return true;
             }
             doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition());
@@ -86,6 +87,7 @@ public abstract class JoinTemplate implements JoinWorker {
           if (status.isOutgoingBatchFull()) {
             // Leave iterators at their current positions and markers.
             // Don't mark on all subsequent doJoin iterations.
+            status.setHasMoreData(true);
             status.disableMarking();
             return true;
           }
@@ -97,6 +99,7 @@ public abstract class JoinTemplate implements JoinWorker {
               doCopyRight(status.right.getCurrentPosition(), status.getOutPosition());
               status.incOutputPos();
               if (status.isOutgoingBatchFull()) {
+                status.setHasMoreData(true);
                 status.disableMarking();
                 return true;
               }

http://git-wip-us.apache.org/repos/asf/drill/blob/d70cf36e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
index ac6ac89..3e0deb2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -213,4 +213,38 @@ public class TestMergeJoinAdvanced extends BaseTestQuery {
       .baselineValues(202452l)
       .go();
   }
+
+  @Test
+  public void testDrill4196() throws Exception {
+    final String leftSide = BaseTestQuery.getTempDir("merge-join-left.json");
+    final String rightSide = BaseTestQuery.getTempDir("merge-join-right.json");
+    final BufferedWriter leftWriter = new BufferedWriter(new FileWriter(new File(leftSide)));
+    final BufferedWriter rightWriter = new BufferedWriter(new FileWriter(new File(rightSide)));
+
+    // output batch is 32k, create 60k left batch
+    leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 9999, 9999));
+    for (int i=0; i < 6000; ++i) {
+      leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10000, 10000));
+    }
+    leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10001, 10001));
+    leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10002, 10002));
+
+    // Keep all values same. Jon will consume entire right side.
+    for (int i=0; i < 800; ++i) {
+      rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10000, 10000));
+    }
+
+    leftWriter.close();
+    rightWriter.close();
+
+    final String query1 = String.format("select count(*) c1 from dfs_test.`%s` L %s join
dfs_test.`%s` R on L.k=R.k1",
+      leftSide, "inner", rightSide);
+    testBuilder()
+      .sqlQuery(query1)
+      .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false")
+      .unOrdered()
+      .baselineColumns("c1")
+      .baselineValues(6000*800L)
+      .go();
+  }
 }


Mime
View raw message