drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject drill git commit: DRILL-1887: Add code-gen for explicitly comparing null values for hash table keys and make corresponding change for hash aggregation and hash join.
Date Fri, 19 Dec 2014 16:50:07 GMT
Repository: drill
Updated Branches:
  refs/heads/master ac6e913bf -> e715a2ce4


DRILL-1887: Add code-gen for explicitly comparing null values for hash table keys and make
corresponding change for hash aggregation and hash join.

Add test cases for joins and aggregations on nullable columns.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e715a2ce
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e715a2ce
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e715a2ce

Branch: refs/heads/master
Commit: e715a2ce40d4908b82f9518615d9f8f3a994cca2
Parents: ac6e913
Author: Aman Sinha <asinha@maprtech.com>
Authored: Thu Dec 18 15:01:30 2014 -0800
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Thu Dec 18 22:37:40 2014 -0800

----------------------------------------------------------------------
 .../impl/aggregate/HashAggTemplate.java         |   3 +-
 .../physical/impl/common/ChainedHashTable.java  |  16 ++-
 .../exec/physical/impl/join/HashJoinBatch.java  |   3 +-
 .../java/org/apache/drill/TestAggNullable.java  |  80 +++++++++++
 .../java/org/apache/drill/TestJoinNullable.java | 139 +++++++++++++++++++
 .../src/test/resources/jsoninput/nullable1.json |   8 ++
 .../src/test/resources/jsoninput/nullable2.json |  16 +++
 7 files changed, 261 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index d25a952..d7cf904 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -252,7 +252,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
 
-    ChainedHashTable ht = new ChainedHashTable(htConfig, context, allocator, incoming, null
/* no incoming probe */, outgoing) ;
+    ChainedHashTable ht = new ChainedHashTable(htConfig, context, allocator, incoming,
+        null /* no incoming probe */, outgoing, true /* nulls are equal */) ;
     this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
 
     numGroupByOutFields = groupByOutFieldIds.length;

http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 4b80781..0502f7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -110,13 +110,15 @@ public class ChainedHashTable {
   private final RecordBatch incomingBuild;
   private final RecordBatch incomingProbe;
   private final RecordBatch outgoing;
+  private final boolean areNullsEqual;
 
   public ChainedHashTable(HashTableConfig htConfig,
                           FragmentContext context,
                           BufferAllocator allocator,
                           RecordBatch incomingBuild,
                           RecordBatch incomingProbe,
-                          RecordBatch outgoing)  {
+                          RecordBatch outgoing,
+                          boolean areNullsEqual)  {
 
     this.htConfig = htConfig;
     this.context = context;
@@ -124,6 +126,7 @@ public class ChainedHashTable {
     this.incomingBuild = incomingBuild;
     this.incomingProbe = incomingProbe;
     this.outgoing = outgoing;
+    this.areNullsEqual = areNullsEqual;
   }
 
   public HashTable createAndSetupHashTable (TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
IOException, SchemaChangeException {
@@ -229,12 +232,21 @@ public class ChainedHashTable {
       ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i++]);
       HoldingContainer right = cg.addExpr(vvrExpr, false);
 
+      JConditional jc;
+
+      // codegen for nullable columns if nulls are not equal
+      if (!areNullsEqual && left.isOptional() && right.isOptional()) {
+        jc = cg.getEvalBlock()._if(left.getIsSet().eq(JExpr.lit(0)).
+            cand(right.getIsSet().eq(JExpr.lit(0))));
+        jc._then()._return(JExpr.FALSE);
+      }
+
       // next we wrap the two comparison sides and add the expression block for the comparison.
       LogicalExpression f = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry());
       HoldingContainer out = cg.addExpr(f, false);
 
       // check if two values are not equal (comparator result != 0)
-      JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+      jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 
       jc._then()._return(JExpr.FALSE);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 7b3751b..5deb67f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -307,7 +307,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
             HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
         // Create the chained hash table
-        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(),
this.right, this.left, null);
+        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(),
+            this.right, this.left, null, false /* nulls are not equal */);
         hashTable = ht.createAndSetupHashTable(null);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
new file mode 100644
index 0000000..4c9617b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.common.util.TestTools;
+import org.junit.Test;
+
+public class TestAggNullable extends BaseTestQuery{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestAggNullable.class);
+
+  static final String WORKING_PATH = TestTools.getWorkingPath();
+  static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+  private static void enableAggr(boolean ha, boolean sa) throws Exception {
+
+    test(String.format("alter session set `planner.enable_hashagg` = %s", ha ? "true":"false"));
+    test(String.format("alter session set `planner.enable_streamagg` = %s", sa ? "true":"false"));
+    test("alter session set `planner.slice_target` = 1");
+  }
+
+  @Test  // HashAgg on nullable columns
+  public void testHashAggNullableColumns() throws Exception {
+    String query1 = String.format("select t2.b2 from dfs_test.`%s/jsoninput/nullable2.json`
t2 " +
+                    " group by t2.b2", TEST_RES_PATH);
+    String query2 = String.format("select t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable2.json`
t2 " +
+        " group by t2.a2, t2.b2", TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 2;
+
+    enableAggr(true, false);
+    actualRecordCount = testSql(query1);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+
+    expectedRecordCount = 4;
+    actualRecordCount = testSql(query2);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test  // StreamingAgg on nullable columns
+  public void testStreamAggNullableColumns() throws Exception {
+    String query1 = String.format("select t2.b2 from dfs_test.`%s/jsoninput/nullable2.json`
t2 " +
+                    " group by t2.b2", TEST_RES_PATH);
+    String query2 = String.format("select t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable2.json`
t2 " +
+        " group by t2.a2, t2.b2", TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 2;
+
+    enableAggr(false, true);
+    actualRecordCount = testSql(query1);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+
+    expectedRecordCount = 4;
+    actualRecordCount = testSql(query2);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
new file mode 100644
index 0000000..c49da6c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.common.util.TestTools;
+import org.junit.Test;
+
+public class TestJoinNullable extends BaseTestQuery{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJoinNullable.class);
+
+  static final String WORKING_PATH = TestTools.getWorkingPath();
+  static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+  private static void enableJoin(boolean hj, boolean mj) throws Exception {
+
+    test(String.format("alter session set `planner.enable_hashjoin` = %s", hj ? "true":"false"));
+    test(String.format("alter session set `planner.enable_mergejoin` = %s", mj ? "true":"false"));
+    test("alter session set `planner.slice_target` = 1");
+  }
+
+  @Test  // InnerJoin on nullable cols, HashJoin
+  public void testHashInnerJoinOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1, " +
+                   " dfs_test.`%s/jsoninput/nullable2.json` t2 where t1.b1 = t2.b2", TEST_RES_PATH
,TEST_RES_PATH);
+    int actualRecordCount;
+    int expectedRecordCount = 1;
+
+    enableJoin(true, false);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test // InnerJoin on nullable cols, MergeJoin
+  public void testMergeInnerJoinOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1, " +
+                   " dfs_test.`%s/jsoninput/nullable2.json` t2 where t1.b1 = t2.b2", TEST_RES_PATH
,TEST_RES_PATH);
+    int actualRecordCount;
+    int expectedRecordCount = 1;
+
+    enableJoin(false, true);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test // LeftOuterJoin on nullable cols, HashJoin
+  public void testHashLOJOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1 " +
+                      " left outer join dfs_test.`%s/jsoninput/nullable2.json` t2 " +
+                      " on t1.b1 = t2.b2", TEST_RES_PATH ,TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 2;
+
+    enableJoin(true, false);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test // RightOuterJoin on nullable cols, HashJoin
+  public void testHashROJOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1 " +
+                      " right outer join dfs_test.`%s/jsoninput/nullable2.json` t2 " +
+                      " on t1.b1 = t2.b2", TEST_RES_PATH ,TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 4;
+
+    enableJoin(true, false);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test // FullOuterJoin on nullable cols, HashJoin
+  public void testHashFOJOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1 " +
+                      " full outer join dfs_test.`%s/jsoninput/nullable2.json` t2 " +
+                      " on t1.b1 = t2.b2", TEST_RES_PATH ,TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 5;
+
+    enableJoin(true, false);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test // LeftOuterJoin on nullable cols, MergeJoin
+  public void testMergeLOJOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1 " +
+                      " left outer join dfs_test.`%s/jsoninput/nullable2.json` t2 " +
+                      " on t1.b1 = t2.b2", TEST_RES_PATH ,TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 2;
+
+    enableJoin(false, true);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @Test // RightOuterJoin on nullable cols, MergeJoin
+  public void testMergeROJOnNullableColumns() throws Exception {
+    String query = String.format("select t1.a1, t1.b1, t2.a2, t2.b2 from dfs_test.`%s/jsoninput/nullable1.json`
t1 " +
+                      " right outer join dfs_test.`%s/jsoninput/nullable2.json` t2 " +
+                      " on t1.b1 = t2.b2", TEST_RES_PATH ,TEST_RES_PATH);
+
+    int actualRecordCount;
+    int expectedRecordCount = 4;
+
+    enableJoin(false, true);
+    actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexepcted number of rows in output: expected=%d,
received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/test/resources/jsoninput/nullable1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/nullable1.json b/exec/java-exec/src/test/resources/jsoninput/nullable1.json
new file mode 100644
index 0000000..8f1f0ac
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/nullable1.json
@@ -0,0 +1,8 @@
+{
+ "a1":1,
+ "b1":"abc"
+}
+{
+ "a1":2,
+ "b1":null
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e715a2ce/exec/java-exec/src/test/resources/jsoninput/nullable2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/nullable2.json b/exec/java-exec/src/test/resources/jsoninput/nullable2.json
new file mode 100644
index 0000000..6592c65
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/nullable2.json
@@ -0,0 +1,16 @@
+{
+ "a2":1,
+ "b2":null
+}
+{
+ "a2":2,
+ "b2":null
+}
+{
+ "a2":2,
+ "b2":"abc"
+}
+{
+ "a2":3,
+ "b2":null
+}


Mime
View raw message