phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [3/9] phoenix git commit: PHOENIX-1556 Base hash versus sort merge join decision on cost
Date Wed, 14 Mar 2018 00:13:20 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index a15ab35..21cbc2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -35,6 +35,10 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.aggregator.Aggregators;
@@ -90,25 +94,30 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
-        try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
-
-        if (byteCount == null) {
+        Double outputBytes = this.accept(new ByteCountVisitor());
+        Double inputRows = this.getDelegate().accept(new RowCountVisitor());
+        Double rowWidth = this.accept(new AvgRowWidthVisitor());
+        if (inputRows == null || outputBytes == null || rowWidth == null) {
             return Cost.UNKNOWN;
         }
+        double inputBytes = inputRows * rowWidth;
+        double rowsBeforeHaving = RowCountVisitor.aggregate(
+                RowCountVisitor.filter(
+                        inputRows.doubleValue(),
+                        RowCountVisitor.stripSkipScanFilter(
+                                context.getScan().getFilter())),
+                groupBy);
+        double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having);
+        double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+        double bytesAfterHaving = rowWidth * rowsAfterHaving;
 
         int parallelLevel = CostUtil.estimateParallelLevel(
                 false, context.getConnection().getQueryServices());
-        Cost cost = CostUtil.estimateAggregateCost(byteCount,
-                groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+        Cost cost = CostUtil.estimateAggregateCost(
+                inputBytes, bytesBeforeHaving, groupBy, parallelLevel);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            double outputBytes = CostUtil.estimateAggregateOutputBytes(
-                    byteCount, groupBy, clientAggregators.getEstimatedByteSize());
-            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+            Cost orderByCost = CostUtil.estimateOrderByCost(
+                    bytesAfterHaving, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return super.getCost().plus(cost);
@@ -210,7 +219,16 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
     public GroupBy getGroupBy() {
         return groupBy;
     }
-    
+
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    public Expression getHaving() {
+        return having;
+    }
+
     private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
         private final List<Expression> groupByExpressions;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
index ac43919..75ba8f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -85,4 +85,8 @@ public abstract class ClientProcessingPlan extends DelegateQueryPlan {
     public FilterableStatement getStatement() {
         return statement;
     }
+
+    public Expression getWhere() {
+        return where;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 5799990..3427f5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -26,6 +26,8 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -53,28 +55,30 @@ public class ClientScanPlan extends ClientProcessingPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
-        try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+        Double inputBytes = this.getDelegate().accept(new ByteCountVisitor());
+        Double outputBytes = this.accept(new ByteCountVisitor());
 
-        if (byteCount == null) {
+        if (inputBytes == null || outputBytes == null) {
             return Cost.UNKNOWN;
         }
 
-        Cost cost = new Cost(0, 0, byteCount);
         int parallelLevel = CostUtil.estimateParallelLevel(
                 false, context.getConnection().getQueryServices());
+        Cost cost = new Cost(0, 0, 0);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            Cost orderByCost =
+                    CostUtil.estimateOrderByCost(inputBytes, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return super.getCost().plus(cost);
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 270ad3d..e3e0264 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -28,6 +28,9 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.optimize.Cost;
@@ -202,19 +205,18 @@ public class CorrelatePlan extends DelegateQueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    public QueryPlan getRhsPlan() {
+        return rhs;
+    }
+
+    @Override
     public Cost getCost() {
-        Long lhsByteCount = null;
-        try {
-            lhsByteCount = delegate.getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
-        Long rhsRowCount = null;
-        try {
-            rhsRowCount = rhs.getEstimatedRowsToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+        Double lhsByteCount = delegate.accept(new ByteCountVisitor());
+        Double rhsRowCount = rhs.accept(new RowCountVisitor());
 
         if (lhsByteCount == null || rhsRowCount == null) {
             return Cost.UNKNOWN;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
index cf0a3cf..0ecf74d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.CursorResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -51,6 +52,11 @@ public class CursorFetchPlan extends DelegateQueryPlan {
 	    return resultIterator;
 	}
 
+	@Override
+	public <T> T accept(QueryPlanVisitor<T> visitor) {
+		return visitor.visit(this);
+	}
+
 
 	@Override
 	public ExplainPlan getExplainPlan() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 23a0da6..6ade42e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -48,6 +48,9 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.execute.visitor.RowCountVisitor;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
@@ -63,10 +66,7 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.ParseNode;
-import org.apache.phoenix.parse.SQLParser;
-import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.*;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -77,6 +77,7 @@ import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.SQLCloseables;
 
 import com.google.common.collect.Lists;
@@ -92,6 +93,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final boolean recompileWhereClause;
     private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
+    private final long serverCacheLimit;
     private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap();
     private HashCacheClient hashClient;
     private AtomicLong firstJobEndTime;
@@ -132,8 +134,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         for (SubPlan subPlan : subPlans) {
             tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
         }
-        this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt(
+        QueryServices services = plan.getContext().getConnection().getQueryServices();
+        this.maxServerCacheTimeToLive = services.getProps().getInt(
                 QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+        this.serverCacheLimit = services.getProps().getLong(
+                QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
     }
     
     @Override
@@ -270,40 +275,101 @@ public class HashJoinPlan extends DelegateQueryPlan {
         return statement;
     }
 
+    public HashJoinInfo getJoinInfo() {
+        return joinInfo;
+    }
+
+    public SubPlan[] getSubPlans() {
+        return subPlans;
+    }
+
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
     @Override
     public Cost getCost() {
-        Long byteCount = null;
         try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+            Long r = delegate.getEstimatedRowsToScan();
+            Double w = delegate.accept(new AvgRowWidthVisitor());
+            if (r == null || w == null) {
+                return Cost.UNKNOWN;
+            }
 
-        if (byteCount == null) {
-            return Cost.UNKNOWN;
-        }
+            int parallelLevel = CostUtil.estimateParallelLevel(
+                    true, getContext().getConnection().getQueryServices());
+
+            double rowWidth = w;
+            double rows = RowCountVisitor.filter(
+                    r.doubleValue(),
+                    RowCountVisitor.stripSkipScanFilter(
+                            delegate.getContext().getScan().getFilter()));
+            double bytes = rowWidth * rows;
+            Cost cost = Cost.ZERO;
+            double rhsByteSum = 0.0;
+            for (int i = 0; i < subPlans.length; i++) {
+                double lhsBytes = bytes;
+                Double rhsRows = subPlans[i].getInnerPlan().accept(new RowCountVisitor());
+                Double rhsWidth = subPlans[i].getInnerPlan().accept(new AvgRowWidthVisitor());
+                if (rhsRows == null || rhsWidth == null) {
+                    return Cost.UNKNOWN;
+                }
+                double rhsBytes = rhsWidth * rhsRows;
+                rows = RowCountVisitor.join(rows, rhsRows, joinInfo.getJoinTypes()[i]);
+                rowWidth = AvgRowWidthVisitor.join(rowWidth, rhsWidth, joinInfo.getJoinTypes()[i]);
+                bytes = rowWidth * rows;
+                cost = cost.plus(CostUtil.estimateHashJoinCost(
+                        lhsBytes, rhsBytes, bytes, subPlans[i].hasKeyRangeExpression(), parallelLevel));
+                rhsByteSum += rhsBytes;
+            }
 
-        Cost cost = new Cost(0, 0, byteCount);
-        Cost lhsCost = delegate.getCost();
-        if (keyRangeExpressions != null) {
-            // The selectivity of the dynamic rowkey filter.
-            // TODO replace the constant with an estimate value.
-            double selectivity = 0.01;
-            lhsCost = lhsCost.multiplyBy(selectivity);
-        }
-        Cost rhsCost = Cost.ZERO;
-        for (SubPlan subPlan : subPlans) {
-            rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+            if (rhsByteSum > serverCacheLimit) {
+                return Cost.UNKNOWN;
+            }
+
+            // Calculate the cost of aggregation and ordering that is performed with the HashJoinPlan
+            if (delegate instanceof AggregatePlan) {
+                AggregatePlan aggPlan = (AggregatePlan) delegate;
+                double rowsBeforeHaving = RowCountVisitor.aggregate(rows, aggPlan.getGroupBy());
+                double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, aggPlan.getHaving());
+                double bytesBeforeHaving = rowWidth * rowsBeforeHaving;
+                double bytesAfterHaving = rowWidth * rowsAfterHaving;
+                Cost aggCost = CostUtil.estimateAggregateCost(
+                        bytes, bytesBeforeHaving, aggPlan.getGroupBy(), parallelLevel);
+                cost = cost.plus(aggCost);
+                rows = rowsAfterHaving;
+                bytes = bytesAfterHaving;
+            }
+            double outputRows = RowCountVisitor.limit(rows, delegate.getLimit());
+            double outputBytes = rowWidth * outputRows;
+            if (!delegate.getOrderBy().getOrderByExpressions().isEmpty()) {
+                int parallelLevel2 = CostUtil.estimateParallelLevel(
+                        delegate instanceof ScanPlan, getContext().getConnection().getQueryServices());
+                Cost orderByCost = CostUtil.estimateOrderByCost(
+                        bytes, outputBytes, parallelLevel);
+                cost = cost.plus(orderByCost);
+            }
+
+            // Calculate the cost of child nodes
+            Cost lhsCost = new Cost(0, 0, r.doubleValue() * w);
+            Cost rhsCost = Cost.ZERO;
+            for (SubPlan subPlan : subPlans) {
+                rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+            }
+            return cost.plus(lhsCost).plus(rhsCost);
+        } catch (SQLException e) {
         }
-        return cost.plus(lhsCost).plus(rhsCost);
+        return Cost.UNKNOWN;
     }
 
-    protected interface SubPlan {
+    public interface SubPlan {
         public ServerCache execute(HashJoinPlan parent) throws SQLException;
         public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
         public List<String> getPreSteps(HashJoinPlan parent) throws SQLException;
         public List<String> getPostSteps(HashJoinPlan parent) throws SQLException;
         public QueryPlan getInnerPlan();
+        public boolean hasKeyRangeExpression();
     }
     
     public static class WhereClauseSubPlan implements SubPlan {
@@ -383,6 +449,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public QueryPlan getInnerPlan() {
             return plan;
         }
+
+        @Override
+        public boolean hasKeyRangeExpression() {
+            return false;
+        }
     }
     
     public static class HashSubPlan implements SubPlan {        
@@ -495,6 +566,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public QueryPlan getInnerPlan() {
             return plan;
         }
+
+        @Override
+        public boolean hasKeyRangeExpression() {
+            return keyRangeLhsExpression != null;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index c9abb69..255fca3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -81,6 +82,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d63950c..ed145a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -37,6 +37,8 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.BaseResultIterators;
@@ -202,16 +204,17 @@ public class ScanPlan extends BaseQueryPlan {
         } catch (SQLException e) {
             // ignored.
         }
+        Double outputBytes = this.accept(new ByteCountVisitor());
 
-        if (byteCount == null) {
+        if (byteCount == null || outputBytes == null) {
             return Cost.UNKNOWN;
         }
 
-        Cost cost = new Cost(0, 0, byteCount);
         int parallelLevel = CostUtil.estimateParallelLevel(
                 true, context.getConnection().getQueryServices());
+        Cost cost = new Cost(0, 0, byteCount);
         if (!orderBy.getOrderByExpressions().isEmpty()) {
-            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, outputBytes, parallelLevel);
             cost = cost.plus(orderByCost);
         }
         return cost;
@@ -320,6 +323,11 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public Long getEstimatedRowsToScan() throws SQLException {
         if (isSerial) {
             return serialRowsEstimate;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 2436d1e..978c7b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -47,6 +47,8 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.execute.visitor.ByteCountVisitor;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.MappedByteBufferQueue;
@@ -171,12 +173,7 @@ public class SortMergeJoinPlan implements QueryPlan {
 
     @Override
     public Cost getCost() {
-        Long byteCount = null;
-        try {
-            byteCount = getEstimatedBytesToScan();
-        } catch (SQLException e) {
-            // ignored.
-        }
+        Double byteCount = this.accept(new ByteCountVisitor());
 
         if (byteCount == null) {
             return Cost.UNKNOWN;
@@ -255,7 +252,11 @@ public class SortMergeJoinPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return false;
     }
-    
+
+    public JoinType getJoinType() {
+        return type;
+    }
+
     private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) {
         SQLException e = null;
         try {
@@ -717,6 +718,11 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
     public Set<TableRef> getSourceRefs() {
         return tableRefs;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index f42af56..f869a4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
@@ -78,4 +79,9 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
         
         return iterator;
     }
+
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 3b5168c..6114d66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -112,6 +113,10 @@ public class UnionPlan implements QueryPlan {
         return iterators.getScans();
     }
 
+    public List<QueryPlan> getSubPlans() {
+        return plans;
+    }
+
     @Override
     public GroupBy getGroupBy() {
         return groupBy;
@@ -230,7 +235,12 @@ public class UnionPlan implements QueryPlan {
         return false;
     }
 
-	@Override
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
 	public Operation getOperation() {
 		return statement.getOperation();
 	}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 51cb67e..0bc3df4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.BaseSingleExpression;
 import org.apache.phoenix.expression.BaseTerminalExpression;
 import org.apache.phoenix.expression.Expression;
@@ -64,6 +65,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
         return null;
     }
 
+    @Override
+    public <T> T accept(QueryPlanVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
     public class UnnestArrayResultIterator extends DelegateResultIterator {
         private final UnnestArrayElemRefExpression elemRefExpression;
         private final UnnestArrayElemIndexExpression elemIndexExpression;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
new file mode 100644
index 0000000..9525747
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the average number of bytes each
+ * row for a QueryPlan.
+ */
+public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> {
+
+    @Override
+    public Double defaultReturn(QueryPlan plan) {
+        return null;
+    }
+
+    @Override
+    public Double visit(AggregatePlan plan) {
+        try {
+            Long byteCount = plan.getEstimatedBytesToScan();
+            Long rowCount = plan.getEstimatedRowsToScan();
+            if (byteCount != null && rowCount != null) {
+                if (byteCount == 0) {
+                    return 0.0;
+                }
+                if (rowCount != 0) {
+                    return ((double) byteCount) / rowCount;
+                }
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ScanPlan plan) {
+        try {
+            Long byteCount = plan.getEstimatedBytesToScan();
+            Long rowCount = plan.getEstimatedRowsToScan();
+            if (byteCount != null && rowCount != null) {
+                if (byteCount == 0) {
+                    return 0.0;
+                }
+                if (rowCount != 0) {
+                    return ((double) byteCount) / rowCount;
+                }
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ClientAggregatePlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(ClientScanPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(LiteralResultIterationPlan plan) {
+        return (double) plan.getEstimatedSize();
+    }
+
+    @Override
+    public Double visit(TupleProjectionPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(HashJoinPlan plan) {
+        Double lhsWidth = plan.getDelegate().accept(this);
+        if (lhsWidth == null) {
+            return null;
+        }
+        JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+        HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+        Double width = lhsWidth;
+        for (int i = 0; i < joinTypes.length; i++) {
+            Double rhsWidth = subPlans[i].getInnerPlan().accept(this);
+            if (rhsWidth == null) {
+                return null;
+            }
+            width = join(width, rhsWidth, joinTypes[i]);
+        }
+
+        return width;
+    }
+
+    @Override
+    public Double visit(SortMergeJoinPlan plan) {
+        Double lhsWidth = plan.getLhsPlan().accept(this);
+        Double rhsWidth = plan.getRhsPlan().accept(this);
+        if (lhsWidth == null || rhsWidth == null) {
+            return null;
+        }
+
+        return join(lhsWidth, rhsWidth, plan.getJoinType());
+    }
+
+    @Override
+    public Double visit(UnionPlan plan) {
+        Double sum = 0.0;
+        for (QueryPlan subPlan : plan.getSubPlans()) {
+            Double avgWidth = subPlan.accept(this);
+            if (avgWidth == null) {
+                return null;
+            }
+            sum += avgWidth;
+        }
+
+        return sum / plan.getSubPlans().size();
+    }
+
+    @Override
+    public Double visit(UnnestArrayPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(CorrelatePlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(CursorFetchPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(ListJarsQueryPlan plan) {
+        return (double) plan.getEstimatedSize();
+    }
+
+    @Override
+    public Double visit(TraceQueryPlan plan) {
+        return (double) plan.getEstimatedSize();
+    }
+
+
+    /*
+     * The below methods provide estimation of row width based on the input row width as well as
+     * the operator.
+     */
+
+    public static double join(double lhsWidth, double rhsWidth, JoinTableNode.JoinType type) {
+        double width;
+        switch (type) {
+            case Inner:
+            case Left:
+            case Right:
+            case Full: {
+                width = lhsWidth + rhsWidth;
+                break;
+            }
+            case Semi:
+            case Anti: {
+                width = lhsWidth;
+                break;
+            }
+            default: {
+                throw new IllegalArgumentException("Invalid join type: " + type);
+            }
+        }
+        return width;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
new file mode 100644
index 0000000..61a2895
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output bytes for a QueryPlan.
+ */
+public class ByteCountVisitor implements QueryPlanVisitor<Double> {
+
+    @Override
+    public Double defaultReturn(QueryPlan plan) {
+        return null;
+    }
+
+    @Override
+    public Double visit(AggregatePlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ScanPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ClientAggregatePlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ClientScanPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(LiteralResultIterationPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(TupleProjectionPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(HashJoinPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(SortMergeJoinPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(UnionPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(UnnestArrayPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(CorrelatePlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(CursorFetchPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(ListJarsQueryPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    @Override
+    public Double visit(TraceQueryPlan plan) {
+        return getByteCountFromRowCountAndRowWidth(plan);
+    }
+
+    protected Double getByteCountFromRowCountAndRowWidth(QueryPlan plan) {
+        Double rowCount = plan.accept(new RowCountVisitor());
+        Double rowWidth = plan.accept(new AvgRowWidthVisitor());
+        if (rowCount == null || rowWidth == null) {
+            return null;
+        }
+
+        return rowCount * rowWidth;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
new file mode 100644
index 0000000..a7ae3af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.*;
+
+/**
+ *
+ * Visitor for a QueryPlan (which may contain other nested query-plans)
+ *
+ */
+public interface QueryPlanVisitor<E> {
+    E defaultReturn(QueryPlan plan);
+    E visit(AggregatePlan plan);
+    E visit(ScanPlan plan);
+    E visit(ClientAggregatePlan plan);
+    E visit(ClientScanPlan plan);
+    E visit(LiteralResultIterationPlan plan);
+    E visit(TupleProjectionPlan plan);
+    E visit(HashJoinPlan plan);
+    E visit(SortMergeJoinPlan plan);
+    E visit(UnionPlan plan);
+    E visit(UnnestArrayPlan plan);
+    E visit(CorrelatePlan plan);
+    E visit(CursorFetchPlan plan);
+    E visit(ListJarsQueryPlan plan);
+    E visit(TraceQueryPlan plan);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
new file mode 100644
index 0000000..58ceea9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute.visitor;
+
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.phoenix.compile.GroupByCompiler;
+import org.apache.phoenix.compile.ListJarsQueryPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.TraceQueryPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.CorrelatePlan;
+import org.apache.phoenix.execute.CursorFetchPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.LiteralResultIterationPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.UnionPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.parse.JoinTableNode;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of QueryPlanVisitor used to get the number of output rows for a QueryPlan.
+ */
+public class RowCountVisitor implements QueryPlanVisitor<Double> {
+
+    // An estimate of the ratio of result data from group-by against the input data.
+    private final static double GROUPING_FACTOR = 0.1;
+
+    private final static double OUTER_JOIN_FACTOR = 1.15;
+    private final static double INNER_JOIN_FACTOR = 0.85;
+    private final static double SEMI_OR_ANTI_JOIN_FACTOR = 0.5;
+
+    private final static double UNION_DISTINCT_FACTOR = 0.8;
+
+    @Override
+    public Double defaultReturn(QueryPlan plan) {
+        return null;
+    }
+
+    @Override
+    public Double visit(AggregatePlan plan) {
+        try {
+            Long b = plan.getEstimatedRowsToScan();
+            if (b != null) {
+                return limit(
+                        filter(
+                                aggregate(
+                                        filter(
+                                                b.doubleValue(),
+                                                stripSkipScanFilter(
+                                                        plan.getContext().getScan().getFilter())),
+                                        plan.getGroupBy()),
+                                plan.getHaving()),
+                        plan.getLimit());
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ScanPlan plan) {
+        try {
+            Long b = plan.getEstimatedRowsToScan();
+            if (b != null) {
+                return limit(
+                        filter(
+                                b.doubleValue(),
+                                stripSkipScanFilter(plan.getContext().getScan().getFilter())),
+                        plan.getLimit());
+            }
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ClientAggregatePlan plan) {
+        Double b = plan.getDelegate().accept(this);
+        if (b != null) {
+            return limit(
+                    filter(
+                            aggregate(
+                                    filter(b.doubleValue(), plan.getWhere()),
+                                    plan.getGroupBy()),
+                            plan.getHaving()),
+                    plan.getLimit());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(ClientScanPlan plan) {
+        if (plan.getLimit() != null) {
+            return (double) plan.getLimit();
+        }
+        Double b = plan.getDelegate().accept(this);
+        if (b != null) {
+            return limit(
+                    filter(b.doubleValue(), plan.getWhere()),
+                    plan.getLimit());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(LiteralResultIterationPlan plan) {
+        return 1.0;
+    }
+
+    @Override
+    public Double visit(TupleProjectionPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(HashJoinPlan plan) {
+        try {
+            QueryPlan lhsPlan = plan.getDelegate();
+            Long b = lhsPlan.getEstimatedRowsToScan();
+            if (b == null) {
+                return null;
+            }
+
+            Double rows = filter(b.doubleValue(),
+                    stripSkipScanFilter(lhsPlan.getContext().getScan().getFilter()));
+            JoinTableNode.JoinType[] joinTypes = plan.getJoinInfo().getJoinTypes();
+            HashJoinPlan.SubPlan[] subPlans = plan.getSubPlans();
+            for (int i = 0; i < joinTypes.length; i++) {
+                Double rhsRows = subPlans[i].getInnerPlan().accept(this);
+                if (rhsRows == null) {
+                    return null;
+                }
+                rows = join(rows, rhsRows.doubleValue(), joinTypes[i]);
+            }
+            if (lhsPlan instanceof AggregatePlan) {
+                AggregatePlan aggPlan = (AggregatePlan) lhsPlan;
+                rows = filter(aggregate(rows, aggPlan.getGroupBy()), aggPlan.getHaving());
+            }
+            return limit(rows, lhsPlan.getLimit());
+        } catch (SQLException e) {
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(SortMergeJoinPlan plan) {
+        Double lhsRows = plan.getLhsPlan().accept(this);
+        Double rhsRows = plan.getRhsPlan().accept(this);
+        if (lhsRows != null && rhsRows != null) {
+            return join(lhsRows, rhsRows, plan.getJoinType());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(UnionPlan plan) {
+        int count = plan.getSubPlans().size();
+        double[] inputRows = new double[count];
+        for (int i = 0; i < count; i++) {
+            Double b = plan.getSubPlans().get(i).accept(this);
+            if (b != null) {
+                inputRows[i] = b.doubleValue();
+            } else {
+                return null;
+            }
+        }
+
+        return limit(union(true, inputRows),plan.getLimit());
+    }
+
+    @Override
+    public Double visit(UnnestArrayPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(CorrelatePlan plan) {
+        Double lhsRows = plan.getDelegate().accept(this);
+        if (lhsRows != null) {
+            return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+        }
+
+        return null;
+    }
+
+    @Override
+    public Double visit(CursorFetchPlan plan) {
+        return plan.getDelegate().accept(this);
+    }
+
+    @Override
+    public Double visit(ListJarsQueryPlan plan) {
+        return 0.0;
+    }
+
+    @Override
+    public Double visit(TraceQueryPlan plan) {
+        return 0.0;
+    }
+
+    public static Filter stripSkipScanFilter(Filter filter) {
+        if (filter == null) {
+            return null;
+        }
+        if (!(filter instanceof FilterList)) {
+            return filter instanceof BooleanExpressionFilter ? filter : null;
+        }
+        FilterList filterList = (FilterList) filter;
+        if (filterList.getOperator() != FilterList.Operator.MUST_PASS_ALL) {
+            return filter;
+        }
+        List<Filter> list = new ArrayList<>();
+        for (Filter f : filterList.getFilters()) {
+            Filter stripped = stripSkipScanFilter(f);
+            if (stripped != null) {
+                list.add(stripped);
+            }
+        }
+        return list.isEmpty() ? null : (list.size() == 1 ? list.get(0) : new FilterList(FilterList.Operator.MUST_PASS_ALL, list));
+    }
+
+
+    /*
+     * The below methods provide estimation of row count based on the input row count as well as
+     * the operator. They should be replaced by more accurate calculation based on histogram and
+     * a logical operator layer is expect to facilitate this.
+     */
+
+    public static double filter(double inputRows, Filter filter) {
+        if (filter == null) {
+            return inputRows;
+        }
+        return 0.5 * inputRows;
+    }
+
+    public static double filter(double inputRows, Expression filter) {
+        if (filter == null) {
+            return inputRows;
+        }
+        return 0.5 * inputRows;
+    }
+
+    public static double aggregate(double inputRows, GroupByCompiler.GroupBy groupBy) {
+        if (groupBy.isUngroupedAggregate()) {
+            return 1.0;
+        }
+        return GROUPING_FACTOR * inputRows;
+    }
+
+    public static double limit(double inputRows, Integer limit) {
+        if (limit == null) {
+            return inputRows;
+        }
+        return limit;
+    }
+
+    public static double join(double lhsRows, double[] rhsRows, JoinTableNode.JoinType[] types) {
+        assert rhsRows.length == types.length;
+        double rows = lhsRows;
+        for (int i = 0; i < rhsRows.length; i++) {
+            rows = join(rows, rhsRows[i], types[i]);
+        }
+        return rows;
+    }
+
+    public static double join(double lhsRows, double rhsRows, JoinTableNode.JoinType type) {
+        double rows;
+        switch (type) {
+            case Inner: {
+                rows = Math.min(lhsRows, rhsRows);
+                rows = rows * INNER_JOIN_FACTOR;
+                break;
+            }
+            case Left:
+            case Right:
+            case Full: {
+                rows = Math.max(lhsRows, rhsRows);
+                rows = rows * OUTER_JOIN_FACTOR;
+                break;
+            }
+            case Semi:
+            case Anti: {
+                rows = lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
+                break;
+            }
+            default: {
+                throw new IllegalArgumentException("Invalid join type: " + type);
+            }
+        }
+        return rows;
+    }
+
+    public static double union(boolean all, double... inputRows) {
+        double rows = 0.0;
+        for (double d : inputRows) {
+            rows += d;
+        }
+        if (!all) {
+            rows *= UNION_DISTINCT_FACTOR;
+        }
+        return rows;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 0c9e383..f664a9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.UpgradeRequiredException;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
@@ -732,6 +733,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 }
 
                 @Override
+                public <T> T accept(QueryPlanVisitor<T> visitor) {
+                    return visitor.defaultReturn(this);
+                }
+
+                @Override
                 public Long getEstimatedRowsToScan() {
                     return estimatedRows;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
index 1d4b8e0..db2b5ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -30,51 +30,52 @@ import org.apache.phoenix.query.QueryServices;
  */
 public class CostUtil {
 
-    // An estimate of the ratio of result data from group-by against the input data.
-    private final static double GROUPING_FACTOR = 0.1;
-
-    // Io operations conducted in intermediate evaluations like sorting or aggregation
-    // should be counted twice since they usually involve both read and write.
-    private final static double IO_COST_MULTIPLIER = 2.0;
-
     /**
-     * Estimate the number of output bytes of an aggregate.
-     * @param byteCount the number of input bytes
+     * Estimate the cost of an aggregate.
+     * @param inputBytes the number of input bytes
+     * @param outputBytes the number of output bytes
      * @param groupBy the compiled GroupBy object
-     * @param aggregatorsSize the byte size of aggregators
-     * @return the output byte count
+     * @param parallelLevel number of parallel workers or threads
+     * @return the cost
      */
-    public static double estimateAggregateOutputBytes(
-            double byteCount, GroupBy groupBy, int aggregatorsSize) {
-        if (groupBy.isUngroupedAggregate()) {
-            return aggregatorsSize;
-        }
-        return byteCount * GROUPING_FACTOR;
+    public static Cost estimateAggregateCost(
+            double inputBytes, double outputBytes, GroupBy groupBy, int parallelLevel) {
+        double hashMapOverhead = groupBy.isOrderPreserving() || groupBy.isUngroupedAggregate() ? 1 : (outputBytes < 1 ? 1 : outputBytes);
+        return new Cost(0, 0, (outputBytes + hashMapOverhead * Math.log(inputBytes)) / parallelLevel);
     }
 
     /**
-     * Estimate the cost of an aggregate.
-     * @param byteCount the number of input bytes
-     * @param groupBy the compiled GroupBy object
-     * @param aggregatorsSize the byte size of aggregators
+     * Estimate the cost of an order-by
+     * @param inputBytes the number of input bytes
+     * @param outputBytes the number of output bytes, which may be different from inputBytes
+     *                    depending on whether there is a LIMIT
      * @param parallelLevel number of parallel workers or threads
      * @return the cost
      */
-    public static Cost estimateAggregateCost(
-            double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) {
-        double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize);
-        double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
-        return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel);
+    public static Cost estimateOrderByCost(double inputBytes, double outputBytes, int parallelLevel) {
+        if (inputBytes < 1) {
+            inputBytes = 1;
+        }
+        return new Cost(0, 0,
+                (outputBytes + outputBytes * Math.log(inputBytes)) / parallelLevel);
     }
 
     /**
-     * Estimate the cost of an order-by
-     * @param byteCount the number of input bytes
+     * Estimate the cost of a hash-join
+     * @param lhsBytes the number of left input bytes
+     * @param rhsBytes the number of right input bytes
+     * @param outputBytes the number of output bytes
      * @param parallelLevel number of parallel workers or threads
      * @return the cost
      */
-    public static Cost estimateOrderByCost(double byteCount, int parallelLevel) {
-        return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+    public static Cost estimateHashJoinCost(
+            double lhsBytes, double rhsBytes, double outputBytes,
+            boolean hasKeyRangeExpression, int parallelLevel) {
+        if (rhsBytes < 1) {
+            rhsBytes = 1;
+        }
+        return new Cost(0, 0,
+                (rhsBytes * Math.log(rhsBytes) + (hasKeyRangeExpression ? 0 : lhsBytes)) / parallelLevel + outputBytes);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 1903dda..69aeaad 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.iterate.ParallelIterators;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
@@ -474,6 +475,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             }
 
             @Override
+            public <T> T accept(QueryPlanVisitor<T> visitor) {
+                return visitor.defaultReturn(this);
+            }
+
+            @Override
             public Long getEstimatedRowsToScan() {
                 return null;
             }


Mime
View raw message