drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [8/8] drill git commit: DRILL-2209 Insert ProjectOperator with MuxExchange
Date Mon, 09 Mar 2015 08:23:55 GMT
DRILL-2209 Insert ProjectOperator with MuxExchange


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f658a3c5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f658a3c5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f658a3c5

Branch: refs/heads/master
Commit: f658a3c513ddf7f2d1b0ad7aa1f3f65049a594fe
Parents: 59aae34
Author: Yuliya Feldman <yfeldman@maprtech.com>
Authored: Thu Feb 12 19:32:11 2015 -0800
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Sun Mar 8 22:49:18 2015 -0700

----------------------------------------------------------------------
 .../physical/HashToRandomExchangePrel.java      |   2 +
 .../drill/exec/planner/physical/PrelUtil.java   |   7 +
 .../visitor/InsertLocalExchangeVisitor.java     |  84 +++++++++---
 .../exec/physical/impl/TestLocalExchange.java   | 128 ++++++++++++++++++-
 4 files changed, 204 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f658a3c5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index 372c75d..6826e46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -39,6 +39,7 @@ import org.eigenbase.relopt.RelTraitSet;
 
 public class HashToRandomExchangePrel extends ExchangePrel {
 
+
   private final List<DistributionField> fields;
 
   public HashToRandomExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
List<DistributionField> fields) {
@@ -93,6 +94,7 @@ public class HashToRandomExchangePrel extends ExchangePrel {
       return childPOP;
     }
 
+    // TODO - refactor to different exchange name
     HashToRandomExchange g = new HashToRandomExchange(childPOP, PrelUtil.getHashExpression(this.fields,
getChild().getRowType()));
     return creator.addMetadata(this, g);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f658a3c5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index 1adc54f..aa835e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -58,6 +58,8 @@ import com.google.common.collect.Sets;
 
 public class PrelUtil {
 
+  public static final String HASH_EXPR_NAME = "E_X_P_R_H_A_S_H_F_I_E_L_D";
+
   public static List<Ordering> getOrdering(RelCollation collation, RelDataType rowType)
{
     List<Ordering> orderExpr = Lists.newArrayList();
 
@@ -79,6 +81,11 @@ public class PrelUtil {
 
     final List<String> childFields = rowType.getFieldNames();
 
+    // If we already included a field with hash - no need to calculate hash further down
+    if ( childFields.contains(HASH_EXPR_NAME)) {
+      return new FieldReference(HASH_EXPR_NAME);
+    }
+
     FieldReference fr = new FieldReference(childFields.get(fields.get(0).getFieldId()), ExpressionPosition.UNKNOWN);
     FunctionCall func = new FunctionCall("hash",  ImmutableList.of((LogicalExpression)fr),
ExpressionPosition.UNKNOWN);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f658a3c5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
index 8793849..907fcb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
@@ -18,14 +18,25 @@
 package org.apache.drill.exec.planner.physical.visitor;
 
 import com.google.common.collect.Lists;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
 import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.eigenbase.rel.RelNode;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexUtil;
 
 import java.util.Collections;
 import java.util.List;
@@ -54,29 +65,72 @@ public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel,
Void, Runt
   @Override
   public Prel visitExchange(ExchangePrel prel, Void value) throws RuntimeException {
     Prel child = ((Prel)prel.getChild()).accept(this, null);
-    // Whenever we encounter a HashToRandomExchangePrel:
+    // Whenever we encounter a HashToRandomExchangePrel
     //   If MuxExchange is enabled, insert a UnorderedMuxExchangePrel before HashToRandomExchangePrel.
     //   If DeMuxExchange is enabled, insert a UnorderedDeMuxExchangePrel after HashToRandomExchangePrel.
-    if (prel instanceof HashToRandomExchangePrel) {
-      Prel newPrel = child;
-      if (isMuxEnabled) {
-        newPrel = new UnorderedMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), child);
-      }
+    if (!(prel instanceof HashToRandomExchangePrel)) {
+      return (Prel)prel.copy(prel.getTraitSet(), Collections.singletonList(((RelNode)child)));
+    }
 
-      newPrel = new HashToRandomExchangePrel(prel.getCluster(),
-          prel.getTraitSet(), newPrel, ((HashToRandomExchangePrel) prel).getFields());
+    Prel newPrel = child;
 
-      if (isDeMuxEnabled) {
-        HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel;
-        // Insert a DeMuxExchange to narrow down the number of receivers
-        newPrel = new UnorderedDeMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), hashExchangePrel,
-            hashExchangePrel.getFields());
+    HashToRandomExchangePrel hashPrel = (HashToRandomExchangePrel) prel;
+    final List<String> childFields = child.getRowType().getFieldNames();
+    List <RexNode> removeUpdatedExpr = Lists.newArrayList();
+
+    if ( isMuxEnabled ) {
+      // Insert Project Operator with new column that will be a hash for HashToRandomExchange
fields
+      List<DistributionField> fields = hashPrel.getFields();
+      final DrillSqlOperator sqlOpH = new DrillSqlOperator("hash", 1, MajorType.getDefaultInstance());
+      final DrillSqlOperator sqlOpX = new DrillSqlOperator("xor", 2, MajorType.getDefaultInstance());
+      RexNode prevRex = null;
+      List<String> outputFieldNames = Lists.newArrayList(childFields);
+      final RexBuilder rexBuilder = prel.getCluster().getRexBuilder();
+      final List<RelDataTypeField> childRowTypeFields = child.getRowType().getFieldList();
+      for ( DistributionField field : fields) {
+        final int tmpField = field.getFieldId();
+        RexNode rex = rexBuilder.makeInputRef(childRowTypeFields.get(tmpField).getType(),
tmpField);
+        RexNode rexFunc = rexBuilder.makeCall(sqlOpH, rex);
+        if ( prevRex != null ) {
+          rexFunc = rexBuilder.makeCall(sqlOpX, prevRex, rexFunc);
+        }
+        prevRex = rexFunc;
+      }
+      List <RexNode> updatedExpr = Lists.newArrayList();
+      for ( RelDataTypeField field : childRowTypeFields) {
+        RexNode rex = rexBuilder.makeInputRef(field.getType(), field.getIndex());
+        updatedExpr.add(rex);
+        removeUpdatedExpr.add(rex);
       }
+      outputFieldNames.add(PrelUtil.HASH_EXPR_NAME);
+
+      updatedExpr.add(prevRex);
+      RelDataType rowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(),
updatedExpr, outputFieldNames);
 
-      return newPrel;
+      ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), child.getTraitSet(),
child, updatedExpr, rowType);
+
+      newPrel = new UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), addColumnprojectPrel.getTraitSet(),
+          addColumnprojectPrel);
+    }
+
+    newPrel = new HashToRandomExchangePrel(prel.getCluster(),
+        prel.getTraitSet(), newPrel, ((HashToRandomExchangePrel) prel).getFields());
+
+    if (isDeMuxEnabled) {
+      HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel;
+      // Insert a DeMuxExchange to narrow down the number of receivers
+      newPrel = new UnorderedDeMuxExchangePrel(prel.getCluster(), prel.getTraitSet(), hashExchangePrel,
+          hashExchangePrel.getFields());
     }
 
-    return (Prel)prel.copy(prel.getTraitSet(), Collections.singletonList(((RelNode)child)));
+    if ( isMuxEnabled ) {
+      // remove earlier inserted Project Operator - since it creates issues down the road
in HashJoin
+      RelDataType removeRowType = RexUtil.createStructType(newPrel.getCluster().getTypeFactory(),
removeUpdatedExpr, childFields);
+
+      ProjectPrel removeColumnProjectPrel = new ProjectPrel(newPrel.getCluster(), newPrel.getTraitSet(),
newPrel, removeUpdatedExpr, removeRowType);
+      return removeColumnProjectPrel;
+    }
+    return newPrel;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f658a3c5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
index 72715a7..fc1c6b9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.TestBuilder;
@@ -31,6 +32,7 @@ import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -40,6 +42,9 @@ import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.QueryWorkUnit;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -70,6 +75,9 @@ public class TestLocalExchange extends PlanTestBase {
   private final static int CLUSTER_SIZE = 3;
   private final static String MUX_EXCHANGE = "\"unordered-mux-exchange\"";
   private final static String DEMUX_EXCHANGE = "\"unordered-demux-exchange\"";
+  private final static String MUX_EXCHANGE_CONST = "unordered-mux-exchange";
+  private final static String DEMUX_EXCHANGE_CONST = "unordered-demux-exchange";
+  private static final String HASH_EXCHANGE = "hash-to-random-exchange";
   private final static UserSession USER_SESSION = UserSession.Builder.newBuilder()
       .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
       .build();
@@ -82,6 +90,8 @@ public class TestLocalExchange extends PlanTestBase {
 
   private final static int NUM_DEPTS = 40;
   private final static int NUM_EMPLOYEES = 1000;
+  private final static int NUM_MNGRS = 1;
+  private final static int NUM_IDS = 1;
 
   private static String empTableLocation;
   private static String deptTableLocation;
@@ -119,8 +129,8 @@ public class TestLocalExchange extends PlanTestBase {
       File file = new File(empTableLocation + File.separator + fileIndex + ".json");
       PrintWriter printWriter = new PrintWriter(file);
       for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < (fileIndex+1)*empNumRecsPerFile;
recordIndex++) {
-        String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\",
\"dept_id\" : %d }",
-            recordIndex, recordIndex, recordIndex % NUM_DEPTS);
+        String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\",
\"dept_id\" : %d, \"mng_id\" : %d, \"some_id\" : %d }",
+            recordIndex, recordIndex, recordIndex % NUM_DEPTS, recordIndex % NUM_MNGRS, recordIndex
% NUM_IDS);
         printWriter.println(record);
       }
       printWriter.close();
@@ -181,6 +191,37 @@ public class TestLocalExchange extends PlanTestBase {
   }
 
   @Test
+  public void testGroupByMultiFields() throws Exception {
+    // Test multifield hash generation
+
+    test("ALTER SESSION SET `planner.slice_target`=1");
+    test("ALTER SESSION SET `planner.enable_mux_exchange`=" + true);
+    test("ALTER SESSION SET `planner.enable_demux_exchange`=" + false);
+
+    final String groupByMultipleQuery = String.format("SELECT dept_id, mng_id, some_id, count(*)
as numEmployees FROM dfs.`%s` e GROUP BY dept_id, mng_id, some_id", empTableLocation);
+    final String[] groupByMultipleQueryBaselineColumns = new String[] { "dept_id", "mng_id",
"some_id", "numEmployees" };
+
+    final int numOccurrances = NUM_EMPLOYEES/NUM_DEPTS;
+
+    final String plan = getPlanInString("EXPLAIN PLAN FOR " + groupByMultipleQuery, JSON_FORMAT);
+    System.out.println("Plan: " + plan);
+
+    jsonExchangeOrderChecker(plan, false, 1, "xor\\(xor\\(hash\\(.*\\) , hash\\(.*\\) \\)
, hash\\(.*\\) \\) ");
+
+    // Run the query and verify the output
+    final TestBuilder testBuilder = testBuilder()
+        .sqlQuery(groupByMultipleQuery)
+        .unOrdered()
+        .baselineColumns(groupByMultipleQueryBaselineColumns);
+
+    for(int i = 0; i < NUM_DEPTS; i++) {
+      testBuilder.baselineValues(new Object[] { (long)i, (long)0, (long)0, (long)numOccurrances});
+    }
+
+    testBuilder.go();
+  }
+
+  @Test
   public void testGroupBy_NoMux_NoDeMux() throws Exception {
     testGroupByHelper(false, false);
   }
@@ -240,6 +281,14 @@ public class TestLocalExchange extends PlanTestBase {
     String plan = getPlanInString("EXPLAIN PLAN FOR " + query, JSON_FORMAT);
     System.out.println("Plan: " + plan);
 
+    if ( isMuxOn ) {
+      // # of hash exchanges should be = # of mux exchanges + # of demux exchanges
+      assertEquals("HashExpr on the hash column should not happen", 2*expectedNumMuxes+expectedNumDeMuxes,
StringUtils.countMatches(plan, PrelUtil.HASH_EXPR_NAME));
+      jsonExchangeOrderChecker(plan, isDeMuxOn, expectedNumMuxes, "hash(.*) ");
+    } else {
+      assertEquals("HashExpr on the hash column should not happen", 0, StringUtils.countMatches(plan,
PrelUtil.HASH_EXPR_NAME));
+    }
+
     // Make sure the plan has mux and demux exchanges (TODO: currently testing is rudimentary,
     // need to move it to sophisticated testing once we have better planning test tools are
available)
     assertEquals("Wrong number of MuxExchanges are present in the plan",
@@ -263,6 +312,81 @@ public class TestLocalExchange extends PlanTestBase {
     testHelperVerifyPartitionSenderParallelization(plan, isMuxOn, isDeMuxOn);
   }
 
+  private static void jsonExchangeOrderChecker(String plan, boolean isDemuxEnabled, int expectedNumMuxes,
String hashExprPattern) throws Exception {
+    final JSONObject planObj = (JSONObject) new JSONParser().parse(plan);
+    assertNotNull("Corrupted query plan: null", planObj);
+    final JSONArray graphArray = (JSONArray) planObj.get("graph");
+    assertNotNull("No graph array present", graphArray);
+    int i = 0;
+    int k = 0;
+    int prevExprsArraySize = 0;
+    boolean foundExpr = false;
+    int muxesCount = 0;
+    for (Object object : graphArray) {
+      final JSONObject popObj = (JSONObject) object;
+      if ( popObj.containsKey("pop") && popObj.get("pop").equals("project")) {
+        if ( popObj.containsKey("exprs")) {
+          final JSONArray exprsArray = (JSONArray) popObj.get("exprs");
+          for (Object exprObj : exprsArray) {
+            final JSONObject expr = (JSONObject) exprObj;
+            if ( expr.containsKey("ref") && expr.get("ref").equals("`"+PrelUtil.HASH_EXPR_NAME
+"`")) {
+              // found a match. Let's see if next one is the one we need
+              final String hashField = (String) expr.get("expr");
+              assertNotNull("HashExpr field can not be null", hashField);
+              assertTrue("HashExpr field does not match pattern",hashField.matches(hashExprPattern));
+              k = i;
+              foundExpr = true;
+              muxesCount++;
+              break;
+            }
+          }
+          if ( foundExpr ) {
+            // will be reset to prevExprsArraySize-1 on the last project of the whole stanza
+            prevExprsArraySize = exprsArray.size();
+          }
+        }
+      }
+      if ( !foundExpr ) {
+        continue;
+      }
+      // next after project with hashexpr
+      if ( k == i-1) {
+        assertTrue("UnorderedMux should follow Project with HashExpr",
+            popObj.containsKey("pop") && popObj.get("pop").equals(MUX_EXCHANGE_CONST));
+      }
+      if ( k == i-2) {
+        assertTrue("HashToRandomExchange should follow UnorderedMux which should follow Project
with HashExpr",
+            popObj.containsKey("pop") && popObj.get("pop").equals(HASH_EXCHANGE));
+        // is HashToRandom is using HashExpr
+        assertTrue("HashToRandomExchnage should use hashExpr",
+            popObj.containsKey("expr") && popObj.get("expr").equals("`"+PrelUtil.HASH_EXPR_NAME
+"`"));
+      }
+      // if Demux is enabled it also should use HashExpr
+      if ( isDemuxEnabled && k == i-3) {
+        assertTrue("UnorderdDemuxExchange should follow HashToRandomExchange",
+            popObj.containsKey("pop") && popObj.get("pop").equals(DEMUX_EXCHANGE_CONST));
+        // is HashToRandom is using HashExpr
+        assertTrue("UnorderdDemuxExchange should use hashExpr",
+            popObj.containsKey("expr") && popObj.get("expr").equals("`"+PrelUtil.HASH_EXPR_NAME
+"`"));
+      }
+      if ( (isDemuxEnabled && k == i-4) || (!isDemuxEnabled && k == i-3)
) {
+        // it should be a project without hashexpr, check if number of exprs is 1 less then
in first project
+        assertTrue("Should be project without hashexpr", popObj.containsKey("pop") &&
popObj.get("pop").equals("project"));
+        final JSONArray exprsArray = (JSONArray) popObj.get("exprs");
+        assertNotNull("Project should have some fields", exprsArray);
+        assertEquals("Number of fields in closing project should be one less then in starting
project",
+            prevExprsArraySize, exprsArray.size());
+
+        // Now let's reset all the counters, flags if we are going to have another batch
of those exchanges
+        k = 0;
+        foundExpr = false;
+        prevExprsArraySize = 0;
+      }
+      i++;
+    }
+    assertEquals("Number of Project/Mux/HashExchange/... ", expectedNumMuxes, muxesCount);
+  }
+
   // Verify the number of partition senders in a major fragments is not more than the cluster
size and each endpoint
   // in the cluster has at most one fragment from a given major fragment that has the partition
sender.
   private static void testHelperVerifyPartitionSenderParallelization(


Mime
View raw message