phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject phoenix git commit: PHOENIX-2193 Add rules to push down Sort through Union
Date Wed, 21 Oct 2015 02:42:08 GMT
Repository: phoenix
Updated Branches:
  refs/heads/calcite 7f2d11784 -> 145db4a20


PHOENIX-2193 Add rules to push down Sort through Union


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/145db4a2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/145db4a2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/145db4a2

Branch: refs/heads/calcite
Commit: 145db4a20b5fd4b8a8c030406525af25e0651948
Parents: 7f2d117
Author: maryannxue <wei.xue@intel.com>
Authored: Tue Oct 20 22:41:18 2015 -0400
Committer: maryannxue <wei.xue@intel.com>
Committed: Tue Oct 20 22:41:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 36 ++++++---
 .../calcite/jdbc/PhoenixPrepareImpl.java        | 45 +++++++++++
 .../calcite/metadata/PhoenixRelMdCollation.java | 58 ++++++++++++++
 .../calcite/rel/PhoenixAbstractSort.java        |  4 +-
 .../phoenix/calcite/rel/PhoenixClientSort.java  |  2 +-
 .../calcite/rel/PhoenixCompactClientSort.java   |  2 +-
 .../calcite/rel/PhoenixMergeSortUnion.java      | 79 ++++++++++++++++++++
 .../phoenix/calcite/rel/PhoenixServerSort.java  |  2 +-
 .../calcite/rules/PhoenixConverterRules.java    | 41 ++++++++++
 9 files changed, 254 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 0717d74..f8641da 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -1046,21 +1046,37 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(false).sql("select entity_id, a_string from atable where a_string = 'a' union
all select entity_id, a_string from atable where a_string = 'c' order by entity_id desc limit
3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixLimit(fetch=[3])\n" +
-                           "    PhoenixClientSort(sort0=[$0], dir0=[DESC])\n" +
-                           "      PhoenixUnion(all=[true])\n" +
-                           "        PhoenixLimit(fetch=[3])\n" +
-                           "          PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
-                           "            PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n"
+
-                           "              PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'a')])\n" +
-                           "        PhoenixLimit(fetch=[3])\n" +
-                           "          PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
-                           "            PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n"
+
-                           "              PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'c')])\n")
+                           "    PhoenixMergeSortUnion(all=[true])\n" +
+                           "      PhoenixLimit(fetch=[3])\n" +
+                           "        PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
+                           "          PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n"
+
+                           "            PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'a')])\n" +
+                           "      PhoenixLimit(fetch=[3])\n" +
+                           "        PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
+                           "          PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n"
+
+                           "            PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'c')])\n")
                 .resultIs(new Object[][] {
                         {"00C923122312312", "c"},
                         {"00A423122312312", "a"},
                         {"00A323122312312", "a"}})
                 .close();
+        
+        start(false).sql("select entity_id, a_string from atable where a_string = 'a' union
all select entity_id, a_string from atable where a_string = 'c' order by entity_id desc")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixMergeSortUnion(all=[true])\n" +
+                           "    PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
+                           "      PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n"
+
+                           "        PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'a')])\n" +
+                           "    PhoenixServerSort(sort0=[$0], dir0=[DESC])\n" +
+                           "      PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2])\n"
+
+                           "        PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2,
'c')])\n")
+                .resultIs(new Object[][] {
+                        {"00C923122312312", "c"},
+                        {"00A423122312312", "a"},
+                        {"00A323122312312", "a"},
+                        {"00A223122312312", "a"},
+                        {"00A123122312312", "a"}})
+                .close();
     }
     
     @Test public void testUnnest() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
index 84ac6bf..d3ed709 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
@@ -1,16 +1,27 @@
 package org.apache.phoenix.calcite.jdbc;
 
+import java.util.List;
+
 import org.apache.calcite.adapter.enumerable.EnumerableRules;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptCostFactory;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.prepare.Prepare.Materialization;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.Program;
+import org.apache.calcite.tools.Programs;
+import org.apache.calcite.util.Holder;
+import org.apache.calcite.util.Pair;
 import org.apache.phoenix.calcite.PhoenixSchema;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
 import org.apache.phoenix.calcite.parse.SqlCreateView;
 import org.apache.phoenix.calcite.parser.PhoenixParserImpl;
 import org.apache.phoenix.calcite.rules.PhoenixAddScanLimitRule;
@@ -19,6 +30,8 @@ import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule;
 import org.apache.phoenix.calcite.rules.PhoenixInnerSortRemoveRule;
 import org.apache.phoenix.calcite.rules.PhoenixJoinSingleValueAggregateMergeRule;
 
+import com.google.common.base.Function;
+
 public class PhoenixPrepareImpl extends CalcitePrepareImpl {
     public static final ThreadLocal<String> THREAD_SQL_STRING =
         new ThreadLocal<>();
@@ -73,6 +86,38 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl {
                 }
             }
         }
+        
+        Hook.PROGRAM.add(new Function<Pair<List<Materialization>, Holder<Program>>,
Object>() {
+			@Override
+			public Object apply(
+					Pair<List<Materialization>, Holder<Program>> input) {
+				final Program program1 =
+						new Program() {
+					public RelNode run(RelOptPlanner planner, RelNode rel,
+							RelTraitSet requiredOutputTraits) {
+						final RelNode rootRel2 =
+								rel.getTraitSet().equals(requiredOutputTraits)
+								? rel
+										: planner.changeTraits(rel, requiredOutputTraits);
+						assert rootRel2 != null;
+
+						planner.setRoot(rootRel2);
+						final RelOptPlanner planner2 = planner.chooseDelegate();
+						final RelNode rootRel3 = planner2.findBestExp();
+						assert rootRel3 != null : "could not implement exp";
+						return rootRel3;
+					}
+				};
+
+				// Second planner pass to do physical "tweaks". This the first time that
+				// EnumerableCalcRel is introduced.
+				final Program program2 = Programs.hep(Programs.CALC_RULES, true, new PhoenixRelMetadataProvider());;
+
+				Program p = Programs.sequence(program1, program2);
+				input.getValue().set(p);
+				return null;
+			}
+        });
 
         return planner;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
index 1b559f0..cb6b232 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdCollation.java
@@ -1,6 +1,7 @@
 package org.apache.phoenix.calcite.metadata;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
@@ -16,9 +17,11 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
 import org.apache.phoenix.calcite.rel.PhoenixCorrelate;
 import org.apache.phoenix.calcite.rel.PhoenixLimit;
+import org.apache.phoenix.calcite.rel.PhoenixMergeSortUnion;
 import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 
 public class PhoenixRelMdCollation {
     public static final RelMetadataProvider SOURCE =
@@ -42,6 +45,10 @@ public class PhoenixRelMdCollation {
     public ImmutableList<RelCollation> collations(PhoenixClientJoin join) {
         return ImmutableList.copyOf(PhoenixRelMdCollation.mergeJoin(join.getLeft(), join.getRight(),
join.joinInfo.leftKeys, join.joinInfo.rightKeys));
     }
+
+    public ImmutableList<RelCollation> collations(PhoenixMergeSortUnion union) {
+        return ImmutableList.copyOf(PhoenixRelMdCollation.mergeSortUnion(union.getInputs(),
union.all));
+    }
     
     /** Helper method to determine a {@link PhoenixCorrelate}'s collation. */
     public static List<RelCollation> correlate(RelNode left, RelNode right, SemiJoinType
joinType) {
@@ -75,5 +82,56 @@ public class PhoenixRelMdCollation {
         }
         return builder.build();
     }
+    
+    public static List<RelCollation> mergeSortUnion(List<RelNode> inputs, boolean
all) {
+    	if (!all) {
+    		return ImmutableList.of(RelCollations.EMPTY);
+    	}
+    	
+    	Set<RelCollation> mergedCollations = null;
+    	for (RelNode input : inputs) {
+    		final ImmutableList<RelCollation> inputCollations = RelMetadataQuery.collations(input);
+    		Set<RelCollation> nonEmptyInputCollations = Sets.newHashSet();
+			for (RelCollation collation : inputCollations) {
+				if (!collation.getFieldCollations().isEmpty()) {
+					nonEmptyInputCollations.add(collation);
+				}
+			}
+    		
+			if (nonEmptyInputCollations.isEmpty() || mergedCollations == null) {
+    			mergedCollations = nonEmptyInputCollations;
+    		} else {
+    			Set<RelCollation> newCollations = Sets.newHashSet();
+    			for (RelCollation m : mergedCollations) {
+    				for (RelCollation n : nonEmptyInputCollations) {
+    					if (n.satisfies(m)) {
+    						newCollations.add(m);
+    						break;
+    					}
+    				}
+    			}
+    			for (RelCollation n : nonEmptyInputCollations) {
+    				for (RelCollation m : mergedCollations) {
+    					if (m.satisfies(n)) {
+    						newCollations.add(n);
+    						break;
+    					}
+    				}
+    			}
+    			mergedCollations = newCollations;
+    		}
+			
+    		if (mergedCollations.isEmpty()) {
+    			break;
+    		}
+    	}
+    	
+    	// We only return the simplified collation here because PhoenixMergeSortUnion
+    	// needs a definite way for implement().
+		if (mergedCollations.size() != 1) {
+			return ImmutableList.of(RelCollations.EMPTY);
+		}
+        return ImmutableList.of(mergedCollations.iterator().next());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
index c2ac235..66ad9f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -32,9 +32,9 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel
{
         assert !getCollation().getFieldCollations().isEmpty();
     }
     
-    protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector)
{
+    protected static OrderBy getOrderBy(RelCollation collation, Implementor implementor,
TupleProjector tupleProjector) {
         List<OrderByExpression> orderByExpressions = Lists.newArrayList();
-        for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
+        for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
             Expression expr = tupleProjector == null ? 
                       implementor.newColumnExpression(fieldCollation.getFieldIndex()) 
                     : tupleProjector.getExpressions()[fieldCollation.getFieldIndex()];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
index 09218c8..f5a65df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@ -67,7 +67,7 @@ public class PhoenixClientSort extends PhoenixAbstractSort {
             throw new RuntimeException(e);
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor, null);
+        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null);
         
         return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR,
null, null, orderBy, plan);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
index d881f3f..15372bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
@@ -75,7 +75,7 @@ public class PhoenixCompactClientSort extends PhoenixAbstractSort {
             basePlan = (AggregatePlan) delegate;
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor, tupleProjector);
+        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, tupleProjector);
         QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy);
         
         if (hashJoinPlan != null) {        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
new file mode 100644
index 0000000..f3e162b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
@@ -0,0 +1,79 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.parse.SelectStatement;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
+	public final RelCollation collation;
+    
+    public static PhoenixMergeSortUnion create(final List<RelNode> inputs, final boolean
all) {
+    	final List<RelCollation> collationList = PhoenixRelMdCollation.mergeSortUnion(inputs,
all);
+    	assert collationList.size() == 1;
+    	final RelCollation collation = collationList.get(0);
+        RelOptCluster cluster = inputs.get(0).getCluster();
+        RelTraitSet traits = 
+        		cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION)
+                .replaceIfs(RelCollationTraitDef.INSTANCE,
+                        new Supplier<List<RelCollation>>() {
+                    public List<RelCollation> get() {
+                        return ImmutableList.<RelCollation> of(collation);
+                    }
+                });
+        return new PhoenixMergeSortUnion(cluster, traits, inputs, all, collation);
+    }
+    
+    private PhoenixMergeSortUnion(RelOptCluster cluster, RelTraitSet traits, List<RelNode>
inputs, boolean all, RelCollation collation) {
+        super(cluster, traits, inputs, all);
+        this.collation = collation;
+    }
+
+    @Override
+    public PhoenixMergeSortUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean
all) {
+        return create(inputs, all);
+    }
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        for (RelNode input : getInputs()) {
+            if (input.getConvention() != PhoenixRel.CLIENT_CONVENTION) {
+                return planner.getCostFactory().makeInfiniteCost();
+            }
+        }
+        
+        double mergeSortFactor = 1.1;
+        return super.computeSelfCost(planner)
+                .multiplyBy(PHOENIX_FACTOR).multiplyBy(mergeSortFactor);
+    }
+
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size());
+        for (Ord<RelNode> input : Ord.zip(inputs)) {
+            subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e));
+        }
+        
+        final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, implementor, null);
+        return new UnionPlan(subPlans.get(0).getContext(), SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(),
RowProjector.EMPTY_PROJECTOR,
+                null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
index 0818ce6..b43754c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
@@ -68,7 +68,7 @@ public class PhoenixServerSort extends PhoenixAbstractSort {
             basePlan = (ScanPlan) delegate;
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor, null);
+        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null);
         QueryPlan newPlan;
         try {
             newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/145db4a2/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index c60c27b..bab9036 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.trace.CalciteTrace;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
@@ -48,6 +49,7 @@ import org.apache.phoenix.calcite.rel.PhoenixClientSort;
 import org.apache.phoenix.calcite.rel.PhoenixCorrelate;
 import org.apache.phoenix.calcite.rel.PhoenixFilter;
 import org.apache.phoenix.calcite.rel.PhoenixLimit;
+import org.apache.phoenix.calcite.rel.PhoenixMergeSortUnion;
 import org.apache.phoenix.calcite.rel.PhoenixRel;
 import org.apache.phoenix.calcite.rel.PhoenixServerAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
@@ -89,6 +91,7 @@ public class PhoenixConverterRules {
         PhoenixServerAggregateRule.SERVER,
         PhoenixServerAggregateRule.SERVERJOIN,
         PhoenixUnionRule.INSTANCE,
+        PhoenixMergeSortUnionRule.INSTANCE,
         PhoenixClientJoinRule.INSTANCE,
         PhoenixServerJoinRule.INSTANCE,
         PhoenixClientSemiJoinRule.INSTANCE,
@@ -113,6 +116,7 @@ public class PhoenixConverterRules {
         PhoenixServerAggregateRule.CONVERTIBLE_SERVER,
         PhoenixServerAggregateRule.CONVERTIBLE_SERVERJOIN,
         PhoenixUnionRule.CONVERTIBLE,
+        PhoenixMergeSortUnionRule.CONVERTIBLE,
         PhoenixClientJoinRule.CONVERTIBLE,
         PhoenixServerJoinRule.CONVERTIBLE,
         PhoenixClientSemiJoinRule.INSTANCE,
@@ -484,6 +488,43 @@ public class PhoenixConverterRules {
     }
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.Union} to a
+     * {@link PhoenixMergeSortUnion}.
+     */
+    public static class PhoenixMergeSortUnionRule extends PhoenixConverterRule {
+        private static Predicate<LogicalUnion> IS_CONVERTIBLE = new Predicate<LogicalUnion>()
{
+            @Override
+            public boolean apply(LogicalUnion input) {
+                return isConvertible(input);
+            }            
+        };
+        
+        private static Predicate<LogicalUnion> NON_EMPTY_COLLATION = new Predicate<LogicalUnion>()
{
+			@Override
+			public boolean apply(LogicalUnion input) {
+				List<RelCollation> collations = PhoenixRelMdCollation.mergeSortUnion(input.getInputs(),
input.all);
+				return collations.size() == 1 && !collations.get(0).getFieldCollations().isEmpty();
+			}
+        };
+        
+        public static final PhoenixMergeSortUnionRule INSTANCE = new PhoenixMergeSortUnionRule(NON_EMPTY_COLLATION);
+        
+        public static final PhoenixMergeSortUnionRule CONVERTIBLE = new PhoenixMergeSortUnionRule(Predicates.and(IS_CONVERTIBLE,
NON_EMPTY_COLLATION));
+
+        private PhoenixMergeSortUnionRule(Predicate<LogicalUnion> predicate) {
+            super(LogicalUnion.class, predicate, Convention.NONE, 
+                    PhoenixRel.CLIENT_CONVENTION, "PhoenixMergeSortUnionRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final LogicalUnion union = (LogicalUnion) rel;
+            return PhoenixMergeSortUnion.create(
+                    convertList(union.getInputs(), out),
+                    union.all);
+        }
+    }
+
+    /**
      * Rule to convert a {@link org.apache.calcite.rel.core.Join} to a
      * {@link PhoenixClientJoin}.
      */


Mime
View raw message