drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [3/3] drill git commit: DRILL-3381: Add option to distribute on the partition keys when doing doing CTAS with partitioning
Date Fri, 26 Jun 2015 01:49:53 GMT
DRILL-3381: Add option to distribute on the partition keys when doing doing CTAS with partitioning


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7c22366
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7c22366
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7c22366

Branch: refs/heads/master
Commit: c7c2236623f8cd37d367b4dcc0c001f08bf044e5
Parents: c6978a5
Author: Steven Phillips <smp@apache.org>
Authored: Thu Jun 25 17:26:14 2015 -0700
Committer: Steven Phillips <smp@apache.org>
Committed: Thu Jun 25 17:55:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +
 .../exec/planner/physical/WriterPrule.java      | 20 ++++++-
 .../server/options/SystemOptionManager.java     |  1 +
 .../apache/drill/TestCTASPartitionFilter.java   | 61 ++++++++++++++++++++
 4 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 8ea90e0..140e9a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -251,4 +251,7 @@ public interface ExecConstants {
 
   public static final String USE_OLD_ASSIGNMENT_CREATOR = "exec.schedule.assignment.old";
   public static final OptionValidator USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR = new BooleanValidator(USE_OLD_ASSIGNMENT_CREATOR,
false);
+
+  public static final String CTAS_PARTITIONING_HASH_DISTRIBUTE = "store.partition.hash_distribute";
+  public static final BooleanValidator CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR = new
BooleanValidator(CTAS_PARTITIONING_HASH_DISTRIBUTE, false);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
index e191423..d4e3d0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.planner.physical;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.common.DrillWriterRelBase;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillWriterRel;
@@ -31,6 +33,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
 
 import java.util.List;
 
@@ -47,8 +51,13 @@ public class WriterPrule extends Prule{
     final DrillWriterRel writer = call.rel(0);
     final RelNode input = call.rel(1);
 
-    final RelCollation collation = getCollation(writer.getPartitionKeys());
-    final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation);
+    final List<Integer> keys = writer.getPartitionKeys();
+    final RelCollation collation = getCollation(keys);
+    final boolean hashDistribute = PrelUtil.getPlannerSettings(call.getPlanner()).getOptions().getOption(ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR);
+    final RelTraitSet traits = hashDistribute ?
+        input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(getDistribution(keys))
:
+        input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation);
+
     final RelNode convertedInput = convert(input, traits);
 
     if (!new WriteTraitPull(call).go(writer, convertedInput)) {
@@ -67,6 +76,13 @@ public class WriterPrule extends Prule{
     return RelCollationImpl.of(fields);
   }
 
+  private DrillDistributionTrait getDistribution(List<Integer> keys) {
+    List<DistributionField> fields = Lists.newArrayList();
+    for (int key : keys) {
+      fields.add(new DistributionField(key));
+    }
+    return new DrillDistributionTrait(DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(fields));
+  }
 
   private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException>
{
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index abd2212..2d41740 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -107,6 +107,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.AVERAGE_FIELD_WIDTH,
       ExecConstants.NEW_VIEW_DEFAULT_PERMS_VALIDATOR,
       ExecConstants.USE_OLD_ASSIGNMENT_CREATOR_VALIDATOR,
+      ExecConstants.CTAS_PARTITIONING_HASH_DISTRIBUTE_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
       QueryClassLoader.JAVA_COMPILER_DEBUG,

http://git-wip-us.apache.org/repos/asf/drill/blob/c7c22366/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
new file mode 100644
index 0000000..3943426
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.common.util.TestTools;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCTASPartitionFilter extends PlanTestBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestCTASPartitionFilter.class);
+
+  static final String WORKING_PATH = TestTools.getWorkingPath();
+  static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+  private static void testExcludeFilter(String query, int expectedNumFiles,
+      String excludedFilterPattern, int expectedRowCount) throws Exception {
+    int actualRowCount = testSql(query);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    testPlanMatchingPatterns(query, new String[]{numFilesPattern}, new String[]{excludedFilterPattern});
+  }
+
+  @Test
+  public void withDistribution() throws Exception {
+    test("alter session set `planner.slice_target` = 1");
+    test("alter session set `store.partition.hash_distribute` = true");
+    test("use dfs_test.tmp");
+    test(String.format("create table orders_distribution partition by (o_orderpriority) as
select * from dfs_test.`%s/multilevel/parquet`", TEST_RES_PATH));
+    String query = "select * from orders_distribution where o_orderpriority = '1-URGENT'";
+    testExcludeFilter(query, 1, "Filter", 24);
+  }
+
+  @Test
+  public void withoutDistribution() throws Exception {
+    test("alter session set `planner.slice_target` = 1");
+    test("alter session set `store.partition.hash_distribute` = false");
+    test("use dfs_test.tmp");
+    test(String.format("create table orders_no_distribution partition by (o_orderpriority)
as select * from dfs_test.`%s/multilevel/parquet`", TEST_RES_PATH));
+    String query = "select * from orders_no_distribution where o_orderpriority = '1-URGENT'";
+    testExcludeFilter(query, 2, "Filter", 24);
+  }
+}
\ No newline at end of file


Mime
View raw message