tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject git commit: TAJO-972: Broadcast join with left outer join returns duplicated rows.(Hyoungjun Kim via jaehwa)
Date Wed, 23 Jul 2014 02:28:12 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 2a6b38e84 -> a5de83720


TAJO-972: Broadcast join with left outer join returns duplicated rows.(Hyoungjun Kim via jaehwa)

Closes #89


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a5de8372
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a5de8372
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a5de8372

Branch: refs/heads/master
Commit: a5de837209a8d6d9685ad1aa8132b3b4ecd99727
Parents: 2a6b38e
Author: blrunner <blrunner@apache.org>
Authored: Wed Jul 23 11:26:42 2014 +0900
Committer: blrunner <blrunner@apache.org>
Committed: Wed Jul 23 11:26:42 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../engine/planner/global/GlobalPlanner.java    |  4 +-
 .../planner/global/TestBroadcastJoinPlan.java   | 94 ++++++++++++--------
 .../tajo/engine/query/TestJoinBroadcast.java    | 47 +++++++++-
 4 files changed, 104 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8a1aae6..2be9b26 100644
--- a/CHANGES
+++ b/CHANGES
@@ -97,6 +97,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-972: Broadcast join with left outer join returns duplicated rows.
+    (Hyoungjun Kim via jaehwa)
+
     TAJO-666: java.nio.BufferOverflowException occurs when the query includes an order by

     clause on a TEXT column. (Mai Hai Thanh via jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 69ecd02..2daf799 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -293,7 +293,7 @@ public class GlobalPlanner {
       // Checking Left Side of Join
       if (ScanNode.isScanNode(leftNode)) {
         ScanNode scanNode = (ScanNode)leftNode;
-        if (getTableVolume(scanNode) >= broadcastThreshold) {
+        if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >=
broadcastThreshold) {
           numLargeTables++;
         } else {
           leftBroadcast = true;
@@ -306,7 +306,7 @@ public class GlobalPlanner {
       // Checking Right Side OF Join
       if (ScanNode.isScanNode(rightNode)) {
         ScanNode scanNode = (ScanNode)rightNode;
-        if (getTableVolume(scanNode) >= broadcastThreshold) {
+        if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >=
broadcastThreshold) {
           numLargeTables++;
         } else {
           rightBroadcast = true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index fd07ae4..ec39609 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -495,11 +495,13 @@ public class TestBroadcastJoinPlan {
 
     // ((((default.small1 ⟕ default.small2) ⟕ default.small3) ⟕ default.large1) ⟕
default.large2)
     /*
-    |-eb_1402495213549_0000_000007
-       |-eb_1402495213549_0000_000006       (GROUP BY)
-          |-eb_1402495213549_0000_000005    (JOIN)
-             |-eb_1402495213549_0000_000004 (LEAF, large2)
-             |-eb_1402495213549_0000_000003 (LEAF, broadcast JOIN small1, small2, small3,
large1)
+    |-eb_1406022243130_0000_000009
+       |-eb_1406022243130_0000_000008
+          |-eb_1406022243130_0000_000007       (join)
+             |-eb_1406022243130_0000_000006    (scan large2)
+             |-eb_1406022243130_0000_000005    (join)
+                |-eb_1406022243130_0000_000004 (scan large1)
+                |-eb_1406022243130_0000_000003 (scan small1, broadcast join small2, small3)
      */
 
     ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
@@ -508,9 +510,9 @@ public class TestBroadcastJoinPlan {
       ExecutionBlock eb = ebCursor.nextBlock();
       if(index == 0) {
         Collection<String> broadcastTables = eb.getBroadcastTables();
-        assertEquals(3, broadcastTables.size());
+        assertEquals(2, broadcastTables.size());
 
-        assertTrue(broadcastTables.contains("default.small1"));
+        assertTrue(!broadcastTables.contains("default.small1"));
         assertTrue(broadcastTables.contains("default.small2"));
         assertTrue(broadcastTables.contains("default.small3"));
       } else if(index == 1 || index == 2 || index == 3) {
@@ -520,7 +522,7 @@ public class TestBroadcastJoinPlan {
       index++;
     }
 
-    assertEquals(5, index);
+    assertEquals(7, index);
   }
 
   @Test
@@ -712,9 +714,9 @@ public class TestBroadcastJoinPlan {
     globalPlanner.build(masterPlan);
 
     /*
-    |-eb_1402500846700_0000_000007
-       |-eb_1402500846700_0000_000006
-          |-eb_1402500846700_0000_000005 (LEAF, broadcast join small1, small2, small3)
+    |-eb_1406022971444_0000_000005
+       |-eb_1406022971444_0000_000004     (group by)
+          |-eb_1406022971444_0000_000003  (scan small1, broadcast join small2, small3)
     */
 
     ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
@@ -735,7 +737,10 @@ public class TestBroadcastJoinPlan {
         assertEquals("default.small2", scanNode.getCanonicalName());
 
         Collection<String> broadcastTables = eb.getBroadcastTables();
-        assertEquals(3, broadcastTables.size());
+        assertEquals(2, broadcastTables.size());
+
+        assertTrue(broadcastTables.contains("default.small2"));
+        assertTrue(broadcastTables.contains("default.small3"));
       } else if(index == 1) {
         Collection<String> broadcastTables = eb.getBroadcastTables();
         assertEquals(0, broadcastTables.size());
@@ -769,9 +774,11 @@ public class TestBroadcastJoinPlan {
 
     //(((default.small1 ⟕ default.small2) ⟕ default.large1) ⟕ default.small3)
     /*
-     |-eb_1402642709028_0000_000005
-       |-eb_1402642709028_0000_000004    (GROUP BY)
-          |-eb_1402642709028_0000_000003 (LEAF, broadcast JOIN small1, small2, small3, large1)
+    |-eb_1406023347983_0000_000007
+       |-eb_1406023347983_0000_000006
+          |-eb_1406023347983_0000_000005    (join, broadcast small3)
+             |-eb_1406023347983_0000_000004 (scan large1)
+             |-eb_1406023347983_0000_000003 (scan small1, broadcast join small2)
      */
 
     ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
@@ -780,19 +787,20 @@ public class TestBroadcastJoinPlan {
       ExecutionBlock eb = ebCursor.nextBlock();
       if(index == 0) {
         Collection<String> broadcastTables = eb.getBroadcastTables();
-        assertEquals(3, broadcastTables.size());
-
-        assertTrue(broadcastTables.contains("default.small1"));
+        assertEquals(1, broadcastTables.size());
         assertTrue(broadcastTables.contains("default.small2"));
+      } else if (index == 2) {
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(1, broadcastTables.size());
         assertTrue(broadcastTables.contains("default.small3"));
-      } else if(index == 1 || index == 2 || index == 3) {
+      } else if(index == 1 || index == 3) {
         Collection<String> broadcastTables = eb.getBroadcastTables();
         assertEquals(0, broadcastTables.size());
       }
       index++;
     }
 
-    assertEquals(3, index);
+    assertEquals(5, index);
   }
 
   @Test
@@ -820,11 +828,13 @@ public class TestBroadcastJoinPlan {
     // ((((default.small1 ⟕ default.small2) ⟕ default.large1) ⟕ default.large2) ⟕
default.small3)
 
     /*
-    |-eb_1404125948432_0000_000007
-       |-eb_1404125948432_0000_000006
-          |-eb_1404125948432_0000_000005     (JOIN broadcast small3)
-             |-eb_1404125948432_0000_000004  (LEAF, scan large2)
-             |-eb_1404125948432_0000_000003  (LEAF, scan large1, broadcast small1, small2)
+    |-eb_1406023537578_0000_000009
+       |-eb_1406023537578_0000_000008
+          |-eb_1406023537578_0000_000007        (join, broadcast small3)
+             |-eb_1406023537578_0000_000006     (scan large2)
+             |-eb_1406023537578_0000_000005     (join)
+                |-eb_1406023537578_0000_000004  (scan large1)
+                |-eb_1406023537578_0000_000003  (scan small1, broadcast join small2)
     */
     ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan);
     int index = 0;
@@ -835,26 +845,34 @@ public class TestBroadcastJoinPlan {
         assertEquals(NodeType.JOIN, node.getType());
         JoinNode joinNode = (JoinNode)node;
 
-        JoinNode joinNode2 = joinNode.getLeftChild();
+        ScanNode scanNode1 = joinNode.getLeftChild();
         ScanNode scanNode2 = joinNode.getRightChild();
-        assertEquals("default.large1", scanNode2.getCanonicalName());
-
-        ScanNode scanNode3 = joinNode2.getLeftChild();
-        ScanNode scanNode4 = joinNode2.getRightChild();
-        assertEquals("default.small1", scanNode3.getCanonicalName());
-        assertEquals("default.small2", scanNode4.getCanonicalName());
+        assertEquals("default.small1", scanNode1.getCanonicalName());
+        assertEquals("default.small2", scanNode2.getCanonicalName());
 
         Collection<String> broadcastTables = eb.getBroadcastTables();
-        assertEquals(2, broadcastTables.size());
+        assertEquals(1, broadcastTables.size());
+        assertTrue(broadcastTables.contains("default.small2"));
       } else if (index == 1) {
         LogicalNode node = eb.getPlan();
         assertEquals(NodeType.SCAN, node.getType());
-        ScanNode scanNode = (ScanNode)node;
+        ScanNode scanNode = (ScanNode) node;
+        assertEquals("default.large1", scanNode.getCanonicalName());
+
+        Collection<String> broadcastTables = eb.getBroadcastTables();
+        assertEquals(0, broadcastTables.size());
+      } else if (index == 2) {
+        LogicalNode node = eb.getPlan();
+        assertEquals(NodeType.JOIN, node.getType());
+      } else if (index == 3) {
+        LogicalNode node = eb.getPlan();
+        assertEquals(NodeType.SCAN, node.getType());
+        ScanNode scanNode = (ScanNode) node;
         assertEquals("default.large2", scanNode.getCanonicalName());
 
         Collection<String> broadcastTables = eb.getBroadcastTables();
         assertEquals(0, broadcastTables.size());
-      } else if(index == 2) {
+      } else if(index == 4) {
         LogicalNode node = eb.getPlan();
         assertEquals(NodeType.GROUP_BY, node.getType());
 
@@ -866,8 +884,8 @@ public class TestBroadcastJoinPlan {
 
         ScanNode scanNode2 = joinNode1.getLeftChild();
         ScanNode scanNode3 = joinNode1.getRightChild();
-        assertTrue(scanNode2.getCanonicalName().indexOf("0000_000003") > 0);
-        assertTrue(scanNode3.getCanonicalName().indexOf("0000_000004") > 0);
+        assertTrue(scanNode2.getCanonicalName().indexOf("0000_000005") > 0);
+        assertTrue(scanNode3.getCanonicalName().indexOf("0000_000006") > 0);
 
         Collection<String> broadcastTables = eb.getBroadcastTables();
         assertEquals(1, broadcastTables.size());
@@ -875,7 +893,7 @@ public class TestBroadcastJoinPlan {
       index++;
     }
 
-    assertEquals(5, index);
+    assertEquals(7, index);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/a5de8372/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index e01b3c5..9cc65bc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.Int4Datum;
@@ -32,11 +33,9 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManagerFactory;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -576,4 +575,44 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
     appender.flush();
     appender.close();
   }
+
+  @Test
+  public final void testLeftOuterJoinLeftSideSmallTable() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    String[] data = new String[]{ "1000000|a", "1000001|b", "2|c", "3|d", "4|e" };
+    TajoTestingCluster.createTable("table1", schema, tableOptions, data, 1);
+
+    data = new String[10000];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable"
+ i;
+    }
+    TajoTestingCluster.createTable("table_large", schema, tableOptions, data, 2);
+
+    try {
+      ResultSet res = executeString(
+          "select a.id, b.name from table1 a left outer join table_large b on a.id = b.id
order by a.id"
+      );
+
+      String expected = "id,name\n" +
+          "-------------------------------\n" +
+          "2,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable2\n"
+
+          "3,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable3\n"
+
+          "4,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable4\n"
+
+          "1000000,null\n" +
+          "1000001,null\n";
+
+      assertEquals(expected, resultSetToString(res));
+
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE table1 PURGE").close();
+      executeString("DROP TABLE table_large PURGE").close();
+    }
+  }
 }


Mime
View raw message