phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject phoenix git commit: PHOENIX-2198 Support correlate variable
Date Wed, 09 Sep 2015 20:59:16 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master d18afe183 -> 2acb38a9c


PHOENIX-2198 Support correlate variable


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2acb38a9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2acb38a9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2acb38a9

Branch: refs/heads/master
Commit: 2acb38a9c64d0d4f6551af9ad2767b9e36c6975a
Parents: d18afe1
Author: maryannxue <wei.xue@intel.com>
Authored: Wed Sep 9 16:58:57 2015 -0400
Committer: maryannxue <wei.xue@intel.com>
Committed: Wed Sep 9 16:58:57 2015 -0400

----------------------------------------------------------------------
 .../apache/phoenix/execute/AggregatePlan.java   |   9 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  22 +-
 .../apache/phoenix/execute/CorrelatePlan.java   | 208 ++++++++++++++++
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   9 +-
 .../execute/LiteralResultIterationPlan.java     |  11 +-
 .../apache/phoenix/execute/RuntimeContext.java  |  33 +++
 .../phoenix/execute/RuntimeContextImpl.java     |  86 +++++++
 .../org/apache/phoenix/execute/ScanPlan.java    |   9 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |   5 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   8 +-
 .../CorrelateVariableFieldAccessExpression.java |  75 ++++++
 .../phoenix/expression/ExpressionType.java      |  11 +-
 .../visitor/CloneExpressionVisitor.java         |   6 +
 .../expression/visitor/ExpressionVisitor.java   |   2 +
 .../StatelessTraverseAllExpressionVisitor.java  |   7 +
 .../StatelessTraverseNoExpressionVisitor.java   |   7 +
 .../UngroupedAggregatingResultIterator.java     |   3 +-
 .../phoenix/execute/CorrelatePlanTest.java      | 248 +++++++++++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    |   6 +-
 20 files changed, 744 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 67222d3..9a415b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -77,7 +77,14 @@ public class AggregatePlan extends BaseQueryPlan {
             StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
             Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy,
             Expression having) {
-        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory);
+        this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, groupBy, having, null);
+    }
+    
+    private AggregatePlan(
+            StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
+            Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy,
+            Expression having, Expression dynamicFilter) {
+        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
         this.having = having;
         this.aggregators = context.getAggregationManager().getAggregators();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index af5b25a..a71ffec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -41,7 +41,9 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
@@ -93,12 +95,19 @@ public abstract class BaseQueryPlan implements QueryPlan {
     protected final Integer limit;
     protected final OrderBy orderBy;
     protected final GroupBy groupBy;
-    protected final ParallelIteratorFactory parallelIteratorFactory;
+    protected final ParallelIteratorFactory parallelIteratorFactory;    
+    /*
+     * The filter expression that contains CorrelateVariableFieldAccessExpression
+     * and will have impact on the ScanRanges. It will recompiled at runtime 
+     * immediately before creating the ResultIterator.
+     */
+    protected final Expression dynamicFilter;
 
     protected BaseQueryPlan(
             StatementContext context, FilterableStatement statement, TableRef table,
             RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
-            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
+            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory,
+            Expression dynamicFilter) {
         this.context = context;
         this.statement = statement;
         this.tableRef = table;
@@ -108,6 +117,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         this.orderBy = orderBy;
         this.groupBy = groupBy;
         this.parallelIteratorFactory = parallelIteratorFactory;
+        this.dynamicFilter = dynamicFilter;
     }
 
     @Override
@@ -141,6 +151,10 @@ public abstract class BaseQueryPlan implements QueryPlan {
     public RowProjector getProjector() {
         return projection;
     }
+    
+    public Expression getDynamicFilter() {
+        return dynamicFilter;
+    }
 
 //    /**
 //     * Sets up an id used to do round robin queue processing on the server
@@ -175,6 +189,10 @@ public abstract class BaseQueryPlan implements QueryPlan {
         Scan scan = context.getScan();
         PTable table = context.getCurrentTable().getTable();
         
+        if (dynamicFilter != null) {
+            WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), false, null);            
+        }
+        
         if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) {
             ScanUtil.setReversed(scan);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
new file mode 100644
index 0000000..1b0af8c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+
+public class CorrelatePlan extends DelegateQueryPlan {    
+    private final QueryPlan rhs;
+    private final String variableId;
+    private final JoinType joinType;
+    private final boolean isSingleValueOnly;
+    private final RuntimeContext runtimeContext;
+    private final KeyValueSchema joinedSchema;
+    private final KeyValueSchema lhsSchema;
+    private final KeyValueSchema rhsSchema;
+    private final int rhsFieldPosition;
+
+    public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId, 
+            JoinType joinType, boolean isSingleValueOnly, 
+            RuntimeContext runtimeContext, PTable joinedTable, 
+            PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
+        super(lhs);
+        if (joinType != JoinType.Inner && joinType != JoinType.Left && joinType != JoinType.Semi && joinType != JoinType.Anti)
+            throw new IllegalArgumentException("Unsupported join type '" + joinType + "' by CorrelatePlan");
+        
+        this.rhs = rhs;
+        this.variableId = variableId;
+        this.joinType = joinType;
+        this.isSingleValueOnly = isSingleValueOnly;
+        this.runtimeContext = runtimeContext;
+        this.joinedSchema = buildSchema(joinedTable);
+        this.lhsSchema = buildSchema(lhsTable);
+        this.rhsSchema = buildSchema(rhsTable);
+        this.rhsFieldPosition = rhsFieldPosition;
+    }
+
+    private static KeyValueSchema buildSchema(PTable table) {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        if (table != null) {
+            for (PColumn column : table.getColumns()) {
+                if (!SchemaUtil.isPKColumn(column)) {
+                    builder.addField(column);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> steps = Lists.newArrayList();
+        steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + ") TABLES");
+        for (String step : delegate.getExplainPlan().getPlanSteps()) {
+            steps.add("    " + step);            
+        }
+        steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : ""));
+        for (String step : rhs.getExplainPlan().getPlanSteps()) {
+            steps.add("    " + step);            
+        }
+        return new ExplainPlan(steps);
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+            throws SQLException {
+        return new ResultIterator() {
+            private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
+            private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema);
+            private final ValueBitSet rhsBitSet = 
+                    (joinType == JoinType.Semi || joinType == JoinType.Anti) ?
+                            ValueBitSet.EMPTY_VALUE_BITSET 
+                          : ValueBitSet.newInstance(rhsSchema);
+            private final ResultIterator iter = delegate.iterator();
+            private ResultIterator rhsIter = null;
+            private Tuple current = null;
+            private boolean closed = false;
+
+            @Override
+            public void close() throws SQLException {
+                if (!closed) {
+                    closed = true;
+                    iter.close();
+                    if (rhsIter != null) {
+                        rhsIter.close();
+                    }
+                }
+            }
+
+            @Override
+            public Tuple next() throws SQLException {
+                if (closed)
+                    return null;
+                
+                Tuple rhsCurrent = null;
+                if (rhsIter != null) {
+                    rhsCurrent = rhsIter.next();
+                    if (rhsCurrent == null) {
+                        rhsIter.close();
+                        rhsIter = null;
+                    } else if (isSingleValueOnly) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+                    }
+                }
+                while (rhsIter == null) {
+                    current = iter.next();
+                    if (current == null) {
+                        close();
+                        return null;
+                    }
+                    runtimeContext.setCorrelateVariableValue(variableId, current);
+                    rhsIter = rhs.iterator();
+                    rhsCurrent = rhsIter.next();
+                    if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi))
+                            || (rhsCurrent != null && joinType == JoinType.Anti)) {
+                        rhsIter.close();
+                        rhsIter = null;
+                    }
+                }
+                
+                Tuple joined;
+                try {
+                    joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
+                            current : TupleProjector.mergeProjectedValue(
+                                    convertLhs(current), joinedSchema, destBitSet,
+                                    rhsCurrent, rhsSchema, rhsBitSet, rhsFieldPosition);
+                } catch (IOException e) {
+                    throw new SQLException(e);
+                }
+                                
+                if ((joinType == JoinType.Semi || rhsCurrent == null) && rhsIter != null) {
+                    rhsIter.close();
+                    rhsIter = null;
+                }
+                
+                return joined;
+            }
+
+            @Override
+            public void explain(List<String> planSteps) {
+            }
+            
+            private ProjectedValueTuple convertLhs(Tuple lhs) throws IOException {
+                ProjectedValueTuple t;
+                if (lhs instanceof ProjectedValueTuple) {
+                    t = (ProjectedValueTuple) lhs;
+                } else {
+                    ImmutableBytesWritable ptr = getContext().getTempPtr();
+                    TupleProjector.decodeProjectedValue(lhs, ptr);
+                    lhsBitSet.clear();
+                    lhsBitSet.or(ptr);
+                    int bitSetLen = lhsBitSet.getEstimatedLength();
+                    t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(), 
+                            ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen);
+
+                }
+                return t;
+            }
+        };
+    }
+
+    @Override
+    public Integer getLimit() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 98eb2dd..21b25d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -37,7 +37,7 @@ import org.apache.phoenix.schema.TableRef;
 public class DegenerateQueryPlan extends BaseQueryPlan {
 
     public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
-        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);
+        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
         context.setScanRanges(ScanRanges.NOTHING);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2ac728d..72920b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -83,9 +83,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
+    private final int maxServerCacheTimeToLive;
     private List<SQLCloseable> dependencies;
     private HashCacheClient hashClient;
-    private int maxServerCacheTimeToLive;
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
     
@@ -114,6 +114,8 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
+        this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt(
+                QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
     }
     
     @Override
@@ -130,8 +132,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
         List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count);
         dependencies = Lists.newArrayList();
         if (joinInfo != null) {
-            hashClient = new HashCacheClient(delegate.getContext().getConnection());
-            maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+            hashClient = hashClient != null ? 
+                    hashClient 
+                  : new HashCacheClient(delegate.getContext().getConnection());
             firstJobEndTime = new AtomicLong(0);
             keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index e7230cc..ab13e6c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -39,20 +39,20 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 public class LiteralResultIterationPlan extends BaseQueryPlan {
-    protected final Iterator<Tuple> tupleIterator;
+    protected final Iterable<Tuple> tuples;
 
     public LiteralResultIterationPlan(StatementContext context, 
             FilterableStatement statement, TableRef tableRef, RowProjector projection, 
             Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
-        this(Collections.<Tuple> singletonList(new SingleKeyValueTuple(KeyValue.LOWESTKEY)).iterator(), 
+        this(Collections.<Tuple> singletonList(new SingleKeyValueTuple(KeyValue.LOWESTKEY)), 
                 context, statement, tableRef, projection, limit, orderBy, parallelIteratorFactory);
     }
 
-    public LiteralResultIterationPlan(Iterator<Tuple> tupleIterator, StatementContext context, 
+    public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext context, 
             FilterableStatement statement, TableRef tableRef, RowProjector projection, 
             Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
-        super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory);
-        this.tupleIterator = tupleIterator;
+        super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
+        this.tuples = tuples;
     }
 
     @Override
@@ -74,6 +74,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     protected ResultIterator newIterator(ParallelScanGrouper scanGrouper)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {
+            private final Iterator<Tuple> tupleIterator = tuples.iterator();
             private boolean closed = false;
             private int count = 0;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
new file mode 100644
index 0000000..89dd082
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public interface RuntimeContext {
+
+    public abstract void defineCorrelateVariable(String variableId, TableRef def);
+
+    public abstract TableRef getCorrelateVariableDef(String variableId);
+
+    public abstract void setCorrelateVariableValue(String variableId, Tuple value);
+
+    public abstract Tuple getCorrelateVariableValue(String variableId);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
new file mode 100644
index 0000000..6a1ba4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.util.Map;
+
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import com.google.common.collect.Maps;
+
+public class RuntimeContextImpl implements RuntimeContext {
+    Map<String, VariableEntry> correlateVariables;
+
+    public RuntimeContextImpl() {
+        this.correlateVariables = Maps.newHashMap();
+    }
+    
+    @Override
+    public void defineCorrelateVariable(String variableId, TableRef def) {
+        this.correlateVariables.put(variableId, new VariableEntry(def));
+    }
+    
+    @Override
+    public TableRef getCorrelateVariableDef(String variableId) {
+        VariableEntry entry = this.correlateVariables.get(variableId);
+        if (entry == null)
+            throw new RuntimeException("Variable '" + variableId + "' undefined.");
+        
+        return entry.getDef();
+    }
+    
+    @Override
+    public void setCorrelateVariableValue(String variableId, Tuple value) {
+        VariableEntry entry = this.correlateVariables.get(variableId);
+        if (entry == null)
+            throw new RuntimeException("Variable '" + variableId + "' undefined.");
+        
+        entry.setValue(value);
+    }
+
+    @Override
+    public Tuple getCorrelateVariableValue(String variableId) {
+        VariableEntry entry = this.correlateVariables.get(variableId);
+        if (entry == null)
+            throw new RuntimeException("Variable '" + variableId + "' undefined.");
+        
+        return entry.getValue();
+    }
+    
+    private static class VariableEntry {
+        private final TableRef def;
+        private Tuple value;
+        
+        VariableEntry(TableRef def) {
+            this.def = def;
+        }
+        
+        TableRef getDef() {
+            return def;
+        }
+        
+        Tuple getValue() {
+            return value;
+        }
+        
+        void setValue(Tuple value) {
+            this.value = value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index e9b8a3a..9f7e482 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.ChunkedResultIterator;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -75,11 +76,15 @@ public class ScanPlan extends BaseQueryPlan {
     private List<KeyRange> splits;
     private List<List<Scan>> scans;
     private boolean allowPageFilter;
-
+    
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
+        this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter, null);
+    }
+    
+    private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
         super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter));
+                        buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter);
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 1bbda07..297b6cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -83,6 +83,7 @@ public class SortMergeJoinPlan implements QueryPlan {
     private final KeyValueSchema rhsSchema;
     private final int rhsFieldPosition;
     private final boolean isSingleValueOnly;
+    private final int thresholdBytes;
 
     public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, 
             JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
@@ -101,6 +102,8 @@ public class SortMergeJoinPlan implements QueryPlan {
         this.rhsSchema = buildSchema(rhsTable);
         this.rhsFieldPosition = rhsFieldPosition;
         this.isSingleValueOnly = isSingleValueOnly;
+        this.thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+                QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
     }
 
     private static KeyValueSchema buildSchema(PTable table) {
@@ -244,8 +247,6 @@ public class SortMergeJoinPlan implements QueryPlan {
             int len = lhsBitSet.getEstimatedLength();
             this.emptyProjectedValue = new byte[len];
             lhsBitSet.toBytes(emptyProjectedValue, 0);
-            int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
-                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
             this.queue = new MappedByteBufferTupleQueue(thresholdBytes);
             this.queueIterator = null;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index c4a6b20..c8fef3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.expression.BaseSingleExpression;
 import org.apache.phoenix.expression.BaseTerminalExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -47,7 +48,7 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator() throws SQLException {
-        return new UnnestArrayResultIterator(delegate.iterator());
+        return iterator(DefaultParallelScanGrouper.getInstance());
     }
 
     @Override
@@ -61,6 +62,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
         planSteps.add("UNNEST");
         return new ExplainPlan(planSteps);
     }
+    
+    @Override
+    public Integer getLimit() {
+        return null;
+    }
 
     public class UnnestArrayResultIterator extends DelegateResultIterator {
         private final UnnestArrayElemRefExpression elemRefExpression;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
new file mode 100644
index 0000000..7ba43c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.RuntimeContext;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+
+public class CorrelateVariableFieldAccessExpression extends BaseTerminalExpression {
+    private final RuntimeContext runtimeContext;
+    private final String variableId;
+    private final Expression fieldAccessExpression;
+    
+    public CorrelateVariableFieldAccessExpression(RuntimeContext context, String variableId, Expression fieldAccessExpression) {
+        super();
+        this.runtimeContext = context;
+        this.variableId = variableId;
+        this.fieldAccessExpression = fieldAccessExpression;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        Tuple variable = runtimeContext.getCorrelateVariableValue(variableId);
+        if (variable == null)
+            throw new RuntimeException("Variable '" + variableId + "' not set.");
+        
+        return fieldAccessExpression.evaluate(variable, ptr);
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        boolean success = evaluate(null, ptr);
+        Object value = success ? getDataType().toObject(ptr) : null;
+        try {
+            LiteralExpression expr = LiteralExpression.newConstant(value, getDataType());
+            expr.write(output);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    @SuppressWarnings("rawtypes")
+    @Override
+    public PDataType getDataType() {
+        return this.fieldAccessExpression.getDataType();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index dbe2509..6f1d855 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -287,7 +287,7 @@ public enum ExpressionType {
      * Return the ExpressionType for a given Expression instance
      */
     public static ExpressionType valueOf(Expression expression) {
-        ExpressionType type = classToEnumMap.get(expression.getClass());
+        ExpressionType type = valueOfOrNull(expression);
         if (type == null) { // FIXME: this exception gets swallowed and retries happen
             throw new IllegalArgumentException("No ExpressionType for " + expression.getClass());
         }
@@ -299,7 +299,14 @@ public enum ExpressionType {
      * or null if none exists.
      */
     public static ExpressionType valueOfOrNull(Expression expression) {
-        return classToEnumMap.get(expression.getClass());
+        Class <? extends Expression> clazz = expression.getClass();
+        // We will not have CorrelateVariableFieldAccessExpression on the server side,
+        // it will be evaluated at client side and will be serialized as 
+        // LiteralExpression instead.
+        if (clazz == CorrelateVariableFieldAccessExpression.class) {
+            clazz = LiteralExpression.class;
+        }
+        return classToEnumMap.get(clazz);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
index 18b8795..8d14545 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
@@ -61,6 +62,11 @@ public abstract class CloneExpressionVisitor extends TraverseAllExpressionVisito
     }
 
     @Override
+    public Expression visit(CorrelateVariableFieldAccessExpression node) {
+        return node;
+    }
+
+    @Override
     public Expression visit(LiteralExpression node) {
         return node;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
index 0a8d3ad..31f340d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
@@ -108,6 +109,7 @@ public interface ExpressionVisitor<E> {
     public Iterator<Expression> visitEnter(ArrayConstructorExpression node);
     public E visitLeave(ArrayConstructorExpression node, List<E> l);
     
+    public E visit(CorrelateVariableFieldAccessExpression node);
     public E visit(LiteralExpression node);
     public E visit(RowKeyColumnExpression node);
     public E visit(KeyValueColumnExpression node);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
index e7e7c67..3b7067a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
@@ -26,7 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -99,6 +101,11 @@ public class StatelessTraverseAllExpressionVisitor<E> extends TraverseAllExpress
     }
 
     @Override
+    public E visit(CorrelateVariableFieldAccessExpression node) {
+        return null;
+    }
+
+    @Override
     public E visit(LiteralExpression node) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
index 019754f..83b28bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
@@ -26,7 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -99,6 +101,11 @@ public class StatelessTraverseNoExpressionVisitor<E> extends TraverseNoExpressio
     }
 
     @Override
+    public E visit(CorrelateVariableFieldAccessExpression node) {
+        return null;
+    }
+
+    @Override
     public E visit(LiteralExpression node) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
index 797f3ce..e3d0987 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
@@ -39,7 +39,8 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult
         Tuple result = super.next();
         // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't.
         if (result == null && !hasRows) {
-            // Generate value using unused ClientAggregators
+            // We should reset ClientAggregators here in case they are being reused in a new ResultIterator.
+            aggregators.reset(aggregators.getAggregators());
             byte[] value = aggregators.toBytes(aggregators.getAggregators());
             result = new SingleKeyValueTuple(
                     KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
new file mode 100644
index 0000000..7ae3757
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.JoinCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CorrelatePlanTest {
+    
+    private static final StatementContext CONTEXT;
+    static {
+        try {
+            PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class);
+            PhoenixStatement stmt = new PhoenixStatement(connection);
+            ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
+            CONTEXT = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private static final Object[][] LEFT_RELATION = new Object[][] {
+            {1, "1"},
+            {2, "2"},
+            {3, "3"},
+            {4, "4"},
+            {5, "5"},
+    };
+    
+    private static final Object[][] RIGHT_RELATION = new Object[][] {
+            {"2", 20},
+            {"2", 40},
+            {"5", 50},
+            {"6", 60},
+            {"5", 100},
+            {"1", 10},
+            {"3", 30},
+    };        
+    
+    @Test
+    public void testCorrelatePlanWithInnerJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {2, "2", "2", 40},
+                {3, "3", "3", 30},
+                {5, "5", "5", 50},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithLeftJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {2, "2", "2", 40},
+                {3, "3", "3", 30},
+                {4, "4", null, null},
+                {5, "5", "5", 50},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Left, expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithSemiJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1"},
+                {2, "2"},
+                {3, "3"},
+                {5, "5"},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Semi, expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithAntiJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {4, "4"},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Anti, expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithSingleValueOnly() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {2, "2", "2", 40},
+        };
+        try {
+            testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected);
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS.getErrorCode(), e.getErrorCode());
+        }
+        
+        Object[][] rightRelation = new Object[][] {
+                {"2", 20},
+                {"6", 60},
+                {"5", 100},
+                {"1", 10},
+        };        
+        expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, expected);        
+    }
+    
+    private void testCorrelatePlan(Object[][] leftRelation, Object[][] rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, Object[][] expectedResult) throws SQLException {        
+        TableRef leftTable = createProjectedTableFromLiterals(leftRelation[0]);
+        TableRef rightTable = createProjectedTableFromLiterals(rightRelation[0]);
+        String varName = "$cor0";
+        RuntimeContext runtimeContext = new RuntimeContextImpl();
+        runtimeContext.defineCorrelateVariable(varName, leftTable);
+        QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation);
+        QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation);
+        Expression columnExpr = new ColumnRef(rightTable, rightCorrelColumn).newColumnExpression();
+        Expression fieldAccess = new CorrelateVariableFieldAccessExpression(runtimeContext, varName, new ColumnRef(leftTable, leftCorrelColumn).newColumnExpression());
+        Expression filter = ComparisonExpression.create(CompareOp.EQUAL, Arrays.asList(columnExpr, fieldAccess), CONTEXT.getTempPtr(), false);
+        rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, rightTable, RowProjector.EMPTY_PROJECTOR, null, filter, OrderBy.EMPTY_ORDER_BY, rightPlan);
+        PTable joinedTable = JoinCompiler.joinProjectedTables(leftTable.getTable(), rightTable.getTable(), type);
+        CorrelatePlan correlatePlan = new CorrelatePlan(leftPlan, rightPlan, varName, type, false, runtimeContext, joinedTable, leftTable.getTable(), rightTable.getTable(), leftTable.getTable().getColumns().size());
+        ResultIterator iter = correlatePlan.iterator();
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        for (Object[] row : expectedResult) {
+            Tuple next = iter.next();
+            assertNotNull(next);
+            for (int i = 0; i < row.length; i++) {
+                PColumn column = joinedTable.getColumns().get(i);
+                boolean eval = new ProjectedColumnExpression(column, joinedTable, column.getName().getString()).evaluate(next, ptr);
+                Object o = eval ? column.getDataType().toObject(ptr) : null;
+                assertEquals(row[i], o);
+            }
+        }
+    }
+    
+    private QueryPlan newLiteralResultIterationPlan(Object[][] rows) {
+        List<Tuple> tuples = Lists.newArrayList();
+        Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
+        for (Object[] row : rows) {
+            Expression[] exprs = new Expression[row.length];
+            for (int i = 0; i < row.length; i++) {
+                exprs[i] = LiteralExpression.newConstant(row[i]);
+            }
+            TupleProjector projector = new TupleProjector(exprs);
+            tuples.add(projector.projectResults(baseTuple));
+        }
+        
+        return new LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+    }
+
+
+    private TableRef createProjectedTableFromLiterals(Object[] row) {
+        List<PColumn> columns = Lists.<PColumn>newArrayList();
+        for (int i = 0; i < row.length; i++) {
+            String name = ParseNodeFactory.createTempAlias();
+            Expression expr = LiteralExpression.newConstant(row[i]);
+            columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
+                    expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
+                    i, expr.getSortOrder(), null, null, false, name));
+        }
+        try {
+            PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
+                    PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
+                    null, null, columns, null, null, Collections.<PTable>emptyList(),
+                    false, Collections.<PName>emptyList(), null, null, false, false, false, null,
+                    null, null, true);
+            TableRef sourceTable = new TableRef(pTable);
+            List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
+            for (PColumn column : sourceTable.getTable().getColumns()) {
+                sourceColumnRefs.add(new ColumnRef(sourceTable, column.getPosition()));
+            }
+        
+            return new TableRef(TupleProjectionCompiler.createProjectedTable(sourceTable, sourceColumnRefs, false));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }        
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2acb38a9/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
index 0def172..896f920 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -67,13 +67,13 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("rawtypes")
 public class UnnestArrayPlanTest {
     
-    private static final StatementContext context;
+    private static final StatementContext CONTEXT;
     static {
         try {
             PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class);
             PhoenixStatement stmt = new PhoenixStatement(connection);
             ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
-            context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
+            CONTEXT = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
@@ -112,7 +112,7 @@ public class UnnestArrayPlanTest {
     private void testUnnestArrays(PArrayDataType arrayType, List<Object[]> arrays, boolean withOrdinality) throws Exception {
         PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
         List<Tuple> tuples = toTuples(arrayType, arrays);
-        LiteralResultIterationPlan subPlan = new LiteralResultIterationPlan(tuples.iterator(), context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+        LiteralResultIterationPlan subPlan = new LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
         LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType);
         RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0);
         UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);


Mime
View raw message