phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject phoenix git commit: Bug fixes for multi joins
Date Wed, 25 Mar 2015 22:14:55 GMT
Repository: phoenix
Updated Branches:
  refs/heads/calcite 9adb3e00f -> 208150098


Bug fixes for multi joins


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/20815009
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/20815009
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/20815009

Branch: refs/heads/calcite
Commit: 20815009859b2a0e179a8831289fc3e56a6cec67
Parents: 9adb3e0
Author: maryannxue <wei.xue@intel.com>
Authored: Wed Mar 25 18:14:31 2015 -0400
Committer: maryannxue <wei.xue@intel.com>
Committed: Wed Mar 25 18:14:31 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java | 54 +++++++++++++-------
 .../org/apache/phoenix/calcite/PhoenixJoin.java | 41 +++++++++++++--
 .../phoenix/calcite/PhoenixTableScan.java       | 14 ++++-
 .../calcite/PhoenixToEnumerableConverter.java   |  2 -
 4 files changed, 85 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index a719271..a9ad76b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -188,8 +188,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     @Test public void testTableScan() throws Exception {
         start().sql("select * from aTable where a_string = 'a'")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixFilter(condition=[=($2, 'a')])\n" +
-                           "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                           "  PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n")
                 .resultIs(new Object[][] {
                           {"00D300000000XHP", "00A123122312312", "a"}, 
                           {"00D300000000XHP", "00A223122312312", "a"}, 
@@ -201,9 +200,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     @Test public void testProject() throws Exception {
         start().sql("select entity_id, a_string, organization_id from aTable where a_string
= 'a'")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixProject(ENTITY_ID=[$1], A_STRING=[$2], ORGANIZATION_ID=[$0])\n"
+
-                           "    PhoenixFilter(condition=[=($2, 'a')])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                           "  PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')],
project=[[$1, $2, $0]])\n")
                 .resultIs(new Object[][] {
                           {"00A123122312312", "a", "00D300000000XHP"}, 
                           {"00A223122312312", "a", "00D300000000XHP"}, 
@@ -217,11 +214,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixProject(ENTITY_ID=[$4], A_STRING=[$2], ORGANIZATION_ID=[$3])\n"
+
                            "    PhoenixJoin(condition=[AND(=($4, $1), =($3, $0))], joinType=[inner])\n"
+
-                           "      PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n"
+
-                           "        PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
-                           "      PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n"
+
-                           "        PhoenixFilter(condition=[=($2, 'a')])\n" +
-                           "          PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$0,
$1, $2]])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'a')], project=[[$0, $1, $2]])\n")
                 .resultIs(new Object[][] {
                           {"00A123122312312", "a", "00D300000000XHP"}, 
                           {"00A223122312312", "a", "00D300000000XHP"}, 
@@ -233,10 +227,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n"
+
                            "    PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" +
-                           "      PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n"
+
-                           "        PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
-                           "      PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
-                           "        PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+                           "      PhoenixTableScan(table=[[phoenix, ITEMTABLE]], project=[[$0,
$1, $5]])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]], project=[[$0,
$1]])\n")
                 .resultIs(new Object[][] {
                           {"0000000001", "T1", "0000000001", "S1"}, 
                           {"0000000002", "T2", "0000000001", "S1"}, 
@@ -250,13 +242,37 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     @Test public void testMultiJoin() throws Exception {
         start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1
join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id
join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id
where t1.a_string = 'a'") 
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixProject(ENTITY_ID=[$1], A_STRING=[$20], ORGANIZATION_ID=[$36])\n"
+
-                           "    PhoenixJoin(condition=[AND(=($1, $37), =($0, $36))], joinType=[inner])\n"
+
-                           "      PhoenixJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n"
+
-                           "        PhoenixFilter(condition=[=($2, 'a')])\n" +
+                           "  PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n"
+
+                           "    PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n"
+
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
+                           "      PhoenixProject(ORGANIZATION_ID=[$18], ENTITY_ID=[$19],
A_STRING=[$20], B_STRING=[$21], A_INTEGER=[$22], A_DATE=[$23], A_TIME=[$24], A_TIMESTAMP=[$25],
X_DECIMAL=[$26], X_LONG=[$27], X_INTEGER=[$28], Y_INTEGER=[$29], A_BYTE=[$30], A_SHORT=[$31],
A_FLOAT=[$32], A_DOUBLE=[$33], A_UNSIGNED_FLOAT=[$34], A_UNSIGNED_DOUBLE=[$35], ORGANIZATION_ID0=[$0],
ENTITY_ID0=[$1], A_STRING0=[$2], B_STRING0=[$3], A_INTEGER0=[$4], A_DATE0=[$5], A_TIME0=[$6],
A_TIMESTAMP0=[$7], X_DECIMAL0=[$8], X_LONG0=[$9], X_INTEGER0=[$10], Y_INTEGER0=[$11], A_BYTE0=[$12],
A_SHORT0=[$13], A_FLOAT0=[$14], A_DOUBLE0=[$15], A_UNSIGNED_FLOAT0=[$16], A_UNSIGNED_DOUBLE0=[$17])\n"
+
+                           "        PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))],
joinType=[inner])\n" +
                            "          PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'a')])\n")
+                .resultIs(new Object[][] {
+                          {"00A123122312312", "a", "00D300000000XHP"}, 
+                          {"00A223122312312", "a", "00D300000000XHP"}, 
+                          {"00A323122312312", "a", "00D300000000XHP"}, 
+                          {"00A423122312312", "a", "00D300000000XHP"}})
+                .close();
+        start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1
join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id
join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id")

+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n"
+
+                           "    PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n"
+
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
+                           "      PhoenixJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n"
+
                            "        PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                           "        PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"00A123122312312", "a", "00D300000000XHP"}, 
+                          {"00A223122312312", "a", "00D300000000XHP"}, 
+                          {"00A323122312312", "a", "00D300000000XHP"}, 
+                          {"00A423122312312", "a", "00D300000000XHP"}, 
+                          {"00B523122312312", "b", "00D300000000XHP"}, 
+                          {"00B623122312312", "b", "00D300000000XHP"}, 
+                          {"00B723122312312", "b", "00D300000000XHP"}, 
+                          {"00B823122312312", "b", "00D300000000XHP"}, 
+                          {"00C923122312312", "c", "00D300000000XHP"}})
                 .close();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
index b666984..e5f9cda 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
@@ -6,12 +6,17 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
 import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
@@ -43,19 +48,37 @@ public class PhoenixJoin extends Join implements PhoenixRel {
     }
 
     @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        double rowCount = RelMetadataQuery.getRowCount(this);
+        
+        for (RelNode input : getInputs()) {
+            double inputRowCount = input.getRows();
+            if (Double.isInfinite(inputRowCount)) {
+                rowCount = inputRowCount;
+            } else if (input == getLeft() && isHashJoinDoable()) {
+                rowCount += inputRowCount;
+            } else {
+                rowCount += Util.nLogN(inputRowCount);
+            }
+        }
+        RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+        return cost.multiplyBy(PHOENIX_FACTOR);
+    }
+    
+    @Override
     public QueryPlan implement(Implementor implementor) {
         assert getLeft().getConvention() == PhoenixRel.CONVENTION;
         assert getRight().getConvention() == PhoenixRel.CONVENTION;
         PhoenixRel left = (PhoenixRel) getLeft();
         PhoenixRel right = (PhoenixRel) getRight();
-        boolean hashRHS = (left instanceof PhoenixTableScan) && getJoinType() !=
JoinRelType.RIGHT;
-        if (!hashRHS)
+        if (!isHashJoinDoable())
             throw new UnsupportedOperationException();
         
         JoinInfo joinInfo = JoinInfo.of(left, right, getCondition());
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
-        implementor.pushContext(new ImplementorContext(true));
+        implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns()));
         QueryPlan leftPlan = implementor.visitInput(0, left);
         PTable leftTable = implementor.getTableRef().getTable();
         for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();)
{
@@ -86,7 +109,8 @@ public class PhoenixJoin extends Join implements PhoenixRel {
             throw new RuntimeException(e);
         }
         implementor.setTableRef(new TableRef(joinedTable));
-        Expression postFilterExpr = CalciteUtils.toExpression(joinInfo.getRemaining(getCluster().getRexBuilder()),
implementor);
+        RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder());
+        Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter,
implementor);
         @SuppressWarnings("unchecked")
         HashJoinInfo hashJoinInfo = new HashJoinInfo(
                 joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, 
@@ -99,6 +123,15 @@ public class PhoenixJoin extends Join implements PhoenixRel {
         return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new
HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false,
null, null)});
     }
     
+    private boolean isHashJoinDoable() {
+        // TODO check memory limit
+        RelNode rel = getLeft();
+        if (rel instanceof RelSubset) {
+            rel = ((RelSubset) rel).getBest();
+        }
+        return (rel instanceof PhoenixTableScan) && getJoinType() != JoinRelType.RIGHT;
+    }
+    
     private JoinType convertJoinType(JoinRelType type) {
         JoinType ret = null;
         switch (type) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index 8c6153c..f681c88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -69,6 +69,8 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
         for (RelOptRule rule : rules) {
             planner.addRule(rule);
         }
+        planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+        planner.addRule(PhoenixProjectScanMergeRule.INSTANCE);
     }
 
     @Override
@@ -85,8 +87,18 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
             final Double selectivity = RelMetadataQuery.getSelectivity(this, filter);
             cost = cost.multiplyBy(selectivity);
         }
+        if (projects != null) {
+            final double projectFieldRatio = ((double) projects.size()) / getRowType().getFieldCount();
+            cost = cost.multiplyBy(projectFieldRatio);
+        }
         return cost;
     }
+    
+    @Override
+    public double getRows() {
+        return super.getRows()
+                * RelMetadataQuery.getSelectivity(this, filter);
+    }
 
     @Override
     public QueryPlan implement(Implementor implementor) {
@@ -128,7 +140,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel
{
         KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
         List<Expression> exprs = Lists.<Expression> newArrayList();
         for (PColumn column : table.getColumns()) {
-            if (!SchemaUtil.isPKColumn(column)) {
+            if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().isRetainPKColumns())
{
                 Expression expr = implementor.newColumnExpression(column.getPosition());
                 exprs.add(expr);
                 builder.addField(expr);                

http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
index cad1d66..d1750e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
@@ -72,8 +72,6 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements
Enume
     }
     
     static QueryPlan makePlan(PhoenixRel rel) {
-        Program p = Programs.ofRules(PhoenixFilterScanMergeRule.INSTANCE, PhoenixProjectScanMergeRule.INSTANCE);
-        rel = (PhoenixRel) (p.run(rel.getCluster().getPlanner(), rel, RelTraitSet.createEmpty()));
         final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl();
         return phoenixImplementor.visitInput(0, rel);
     }


Mime
View raw message