phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [39/51] [partial] Initial commit
Date Mon, 27 Jan 2014 19:23:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
new file mode 100644
index 0000000..dcf95ce
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.DegenerateQueryPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ * 
+ * Class used to build an executable query plan
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class QueryCompiler {
+    /* 
+     * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't 
+     * want to introduce a dependency on 0.94.5 (where this feature was
+     * introduced). This will do the same thing. Once we do have a 
+     * dependency on 0.94.5 or above, switch this around.
+     */
+    private static final String LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR = "_ondemand_";
+    private final PhoenixStatement statement;
+    private final Scan scan;
+    private final List<? extends PDatum> targetColumns;
+    private final ParallelIteratorFactory parallelIteratorFactory;
+
+    public QueryCompiler(PhoenixStatement statement) throws SQLException {
+        this(statement, Collections.<PDatum>emptyList(), null);
+    }
+    
+    public QueryCompiler(PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
+        this.statement = statement;
+        this.scan = new Scan();
+        this.targetColumns = targetColumns;
+        this.parallelIteratorFactory = parallelIteratorFactory;
+        if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
+            this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
+        }
+    }
+
+    /**
+     * Builds an executable query plan from a parsed SQL statement
+     * @param select parsed SQL statement
+     * @return executable query plan
+     * @throws SQLException if mismatched types are found, bind value do not match binds,
+     * or invalid function arguments are encountered.
+     * @throws SQLFeatureNotSupportedException if an unsupported construct is encountered
+     * @throws TableNotFoundException if table name not found in schema
+     * @throws ColumnNotFoundException if column name could not be resolved
+     * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
+     */
+    public QueryPlan compile(SelectStatement select) throws SQLException {
+        PhoenixConnection connection = statement.getConnection();
+        List<Object> binds = statement.getParameters();
+        ColumnResolver resolver = FromCompiler.getResolver(select, connection);
+        select = StatementNormalizer.normalize(select, resolver);
+        TableRef tableRef = resolver.getTables().get(0);
+        StatementContext context = new StatementContext(select, connection, resolver, binds, scan);
+        // Short circuit out if we're compiling an index query and the index isn't active.
+        // We must do this after the ColumnResolver resolves the table, as we may be updating the local
+        // cache of the index table and it may now be inactive.
+        if (tableRef.getTable().getType() == PTableType.INDEX && tableRef.getTable().getIndexState() != PIndexState.ACTIVE) {
+            return new DegenerateQueryPlan(context, select, tableRef);
+        }
+        Integer limit = LimitCompiler.compile(context, select);
+
+        GroupBy groupBy = GroupByCompiler.compile(context, select);
+        // Optimize the HAVING clause by finding any group by expressions that can be moved
+        // to the WHERE clause
+        select = HavingCompiler.rewrite(context, select, groupBy);
+        Expression having = HavingCompiler.compile(context, select, groupBy);
+        // Don't pass groupBy when building where clause expression, because we do not want to wrap these
+        // expressions as group by key expressions since they're pre, not post filtered.
+        WhereCompiler.compile(context, select);
+        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); 
+        RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, targetColumns);
+        
+        // Final step is to build the query plan
+        int maxRows = statement.getMaxRows();
+        if (maxRows > 0) {
+            if (limit != null) {
+                limit = Math.min(limit, maxRows);
+            } else {
+                limit = maxRows;
+            }
+        }
+        if (select.isAggregate() || select.isDistinct()) {
+            return new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having);
+        } else {
+            return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/src/main/java/org/apache/phoenix/compile/QueryPlan.java
new file mode 100644
index 0000000..6cf893c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ * 
+ * Interface for an executable query plan
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface QueryPlan extends StatementPlan {
+    /**
+     * Get a scanner to iterate over the results
+     * @return scanner for iterating over the results
+     * @throws SQLException
+     */
+    Scanner getScanner() throws SQLException;
+    
+    // TODO: change once joins are supported
+    TableRef getTableRef();
+    /**
+     * Returns projector used to formulate resultSet row
+     */
+    RowProjector getProjector();
+    
+    Integer getLimit();
+
+    OrderBy getOrderBy();
+
+    GroupBy getGroupBy();
+
+    List<KeyRange> getSplits();
+
+    StatementContext getContext();
+    
+    FilterableStatement getStatement();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/RowProjector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/RowProjector.java b/src/main/java/org/apache/phoenix/compile/RowProjector.java
new file mode 100644
index 0000000..6b6344c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/RowProjector.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ * 
+ * Class that manages a set of projected columns accessed through the zero-based
+ * column index for a SELECT clause projection. The column index may be looked up
+ * via the name using {@link #getColumnIndex(String)}.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class RowProjector {
+    public static final RowProjector EMPTY_PROJECTOR = new RowProjector(Collections.<ColumnProjector>emptyList(),0, true);
+
+    private final List<? extends ColumnProjector> columnProjectors;
+    private final Map<String,Integer> reverseIndex;
+    private final boolean allCaseSensitive;
+    private final boolean someCaseSensitive;
+    private final int estimatedSize;
+    private final boolean isProjectEmptyKeyValue;
+    
+    public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) {
+        this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue);
+    }
+    /**
+     * Construct RowProjector based on a list of ColumnProjectors.
+     * @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause
+     * aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may
+     * be null.
+     * @param estimatedRowSize 
+     */
+    public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue) {
+        this.columnProjectors = Collections.unmodifiableList(columnProjectors);
+        int position = columnProjectors.size();
+        reverseIndex = Maps.newHashMapWithExpectedSize(position);
+        boolean allCaseSensitive = true;
+        boolean someCaseSensitive = false;
+        for (--position; position >= 0; position--) {
+            ColumnProjector colProjector = columnProjectors.get(position);
+            allCaseSensitive &= colProjector.isCaseSensitive();
+            someCaseSensitive |= colProjector.isCaseSensitive();
+            reverseIndex.put(colProjector.getName(), position);
+        }
+        this.allCaseSensitive = allCaseSensitive;
+        this.someCaseSensitive = someCaseSensitive;
+        this.estimatedSize = estimatedRowSize;
+        this.isProjectEmptyKeyValue = isProjectEmptyKeyValue;
+    }
+    
+    public boolean isProjectEmptyKeyValue() {
+        return isProjectEmptyKeyValue;
+    }
+    
+    public List<? extends ColumnProjector> getColumnProjectors() {
+        return columnProjectors;
+    }
+    
+    public int getColumnIndex(String name) throws SQLException {
+        if (!someCaseSensitive) {
+            name = SchemaUtil.normalizeIdentifier(name);
+        }
+        Integer index = reverseIndex.get(name);
+        if (index == null) {
+            if (!allCaseSensitive && someCaseSensitive) {
+                name = SchemaUtil.normalizeIdentifier(name);
+                index = reverseIndex.get(name);
+            }
+            if (index == null) {
+                throw new ColumnNotFoundException(name);
+            }
+        }
+        return index;
+    }
+    
+    public ColumnProjector getColumnProjector(int index) {
+        return columnProjectors.get(index);
+    }
+ 
+    public int getColumnCount() {
+        return columnProjectors.size();
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("[");
+        for (ColumnProjector projector : columnProjectors) {
+            buf.append(projector.getExpression());
+            buf.append(',');
+        }
+        buf.setCharAt(buf.length()-1, ']');
+        return buf.toString();
+    }
+
+    public int getEstimatedRowByteSize() {
+        return estimatedSize;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/src/main/java/org/apache/phoenix/compile/ScanRanges.java
new file mode 100644
index 0000000..44f4473
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.util.ScanUtil;
+
+
+public class ScanRanges {
+    private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
+    private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
+    public static final ScanRanges EVERYTHING = new ScanRanges(EVERYTHING_RANGES,null,false);
+    public static final ScanRanges NOTHING = new ScanRanges(NOTHING_RANGES,null,false);
+
+    public static ScanRanges create(List<List<KeyRange>> ranges, RowKeySchema schema) {
+        return create(ranges, schema, false);
+    }
+    
+    public static ScanRanges create(List<List<KeyRange>> ranges, RowKeySchema schema, boolean forceRangeScan) {
+        if (ranges.isEmpty()) {
+            return EVERYTHING;
+        } else if (ranges.size() == 1 && ranges.get(0).size() == 1 && ranges.get(0).get(0) == KeyRange.EMPTY_RANGE) {
+            return NOTHING;
+        }
+        return new ScanRanges(ranges, schema, forceRangeScan);
+    }
+
+    private SkipScanFilter filter;
+    private final List<List<KeyRange>> ranges;
+    private final RowKeySchema schema;
+    private final boolean forceRangeScan;
+
+    private ScanRanges (List<List<KeyRange>> ranges, RowKeySchema schema, boolean forceRangeScan) {
+        List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
+        for (int i = 0; i < ranges.size(); i++) {
+            List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
+            Collections.sort(sorted, KeyRange.COMPARATOR);
+            sortedRanges.add(ImmutableList.copyOf(sorted));
+        }
+        this.ranges = ImmutableList.copyOf(sortedRanges);
+        this.schema = schema;
+        if (schema != null && !ranges.isEmpty()) {
+            this.filter = new SkipScanFilter(this.ranges, schema);
+        }
+        this.forceRangeScan = forceRangeScan;
+    }
+
+    public SkipScanFilter getSkipScanFilter() {
+        return filter;
+    }
+    
+    public List<List<KeyRange>> getRanges() {
+        return ranges;
+    }
+
+    public RowKeySchema getSchema() {
+        return schema;
+    }
+
+    public boolean isEverything() {
+        return this == EVERYTHING;
+    }
+
+    public boolean isDegenerate() {
+        return this == NOTHING;
+    }
+    
+    /**
+     * Use SkipScanFilter under two circumstances:
+     * 1) If we have multiple ranges for a given key slot (use of IN)
+     * 2) If we have a range (i.e. not a single/point key) that is
+     *    not the last key slot
+     */
+    public boolean useSkipScanFilter() {
+        if (forceRangeScan) {
+            return false;
+        }
+        boolean hasRangeKey = false, useSkipScan = false;
+        for (List<KeyRange> orRanges : ranges) {
+            useSkipScan |= orRanges.size() > 1 | hasRangeKey;
+            if (useSkipScan) {
+                return true;
+            }
+            for (KeyRange range : orRanges) {
+                hasRangeKey |= !range.isSingleKey();
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return true if this represents the full key to a single row
+     */
+    public boolean isSingleRowScan() {
+        if (schema == null || ranges.size() < schema.getMaxFields()) {
+            return false;
+        }
+        boolean isSingleKey = true;
+        for (List<KeyRange> orRanges : ranges) {
+            if (orRanges.size() > 1) {
+                return false;
+            }
+            isSingleKey &= orRanges.get(0).isSingleKey();
+        }
+        return isSingleKey;
+    }
+
+    public void setScanStartStopRow(Scan scan) {
+        if (isEverything()) {
+            return;
+        }
+        if (isDegenerate()) {
+            scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange());
+            scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange());
+            return;
+        }
+        
+        byte[] expectedKey;
+        expectedKey = ScanUtil.getMinKey(schema, ranges);
+        if (expectedKey != null) {
+            scan.setStartRow(expectedKey);
+        }
+        expectedKey = ScanUtil.getMaxKey(schema, ranges);
+        if (expectedKey != null) {
+            scan.setStopRow(expectedKey);
+        }
+    }
+
+    public static final ImmutableBytesWritable UNBOUND = new ImmutableBytesWritable(KeyRange.UNBOUND);
+
+    /**
+     * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
+     * intersects with any of the scan ranges and false otherwise. We cannot pass in
+     * a KeyRange here, because the underlying compare functions expect lower inclusive
+     * and upper exclusive keys. We cannot get their next key because the key must
+     * conform to the row key schema and if a null byte is added to a lower inclusive
+     * key, it's no longer a valid, real key.
+     * @param lowerInclusiveKey lower inclusive key
+     * @param upperExclusiveKey upper exclusive key
+     * @return true if the scan range intersects with the specified lower/upper key
+     * range
+     */
+    public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
+        if (isEverything()) {
+            return true;
+        }
+        if (isDegenerate()) {
+            return false;
+        }
+        return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
+   }
+
+    @Override
+    public String toString() {
+        return "ScanRanges[" + ranges.toString() + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/StatementContext.java b/src/main/java/org/apache/phoenix/compile/StatementContext.java
new file mode 100644
index 0000000..5d1ee6b
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.text.Format;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.NumberUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+
+/**
+ *
+ * Class that keeps common state used across processing the various clauses in a
+ * top level JDBC statement such as SELECT, UPSERT, DELETE, etc.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class StatementContext {
+    private final ColumnResolver resolver;
+    private final BindManager binds;
+    private final Scan scan;
+    private final ExpressionManager expressions;
+    private final AggregationManager aggregates;
+    private final String dateFormat;
+    private final Format dateFormatter;
+    private final Format dateParser;
+    private final String numberFormat;
+    private final ImmutableBytesWritable tempPtr;
+    private final PhoenixConnection connection;
+    
+    private long currentTime = QueryConstants.UNSET_TIMESTAMP;
+    private ScanRanges scanRanges = ScanRanges.EVERYTHING;
+    private KeyRange minMaxRange = null;
+
+    public StatementContext(BindableStatement statement, PhoenixConnection connection, ColumnResolver resolver, List<Object> binds, Scan scan) {
+        this.connection = connection;
+        this.resolver = resolver;
+        this.scan = scan;
+        this.binds = new BindManager(binds, statement.getBindCount());
+        this.aggregates = new AggregationManager();
+        this.expressions = new ExpressionManager();
+        this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+        this.dateFormatter = DateUtil.getDateFormatter(dateFormat);
+        this.dateParser = DateUtil.getDateParser(dateFormat);
+        this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
+        this.tempPtr = new ImmutableBytesWritable();
+    }
+
+    public String getDateFormat() {
+        return dateFormat;
+    }
+
+    public Format getDateFormatter() {
+        return dateFormatter;
+    }
+
+    public Format getDateParser() {
+        return dateParser;
+    }
+    
+    public String getNumberFormat() {
+        return numberFormat;
+    }
+    
+    public Scan getScan() {
+        return scan;
+    }
+
+    public BindManager getBindManager() {
+        return binds;
+    }
+
+    public AggregationManager getAggregationManager() {
+        return aggregates;
+    }
+
+    public ColumnResolver getResolver() {
+        return resolver;
+    }
+
+    public ExpressionManager getExpressionManager() {
+        return expressions;
+    }
+
+
+    public ImmutableBytesWritable getTempPtr() {
+        return tempPtr;
+    }
+
+    public ScanRanges getScanRanges() {
+        return this.scanRanges;
+    }
+    
+    public void setScanRanges(ScanRanges scanRanges) {
+        setScanRanges(scanRanges, null);
+    }
+
+    public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) {
+        this.scanRanges = scanRanges;
+        this.scanRanges.setScanStartStopRow(scan);
+        PTable table = this.getResolver().getTables().get(0).getTable();
+        if (minMaxRange != null) {
+            // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
+            // what we need to intersect against for the HBase scan.
+            byte[] lowerRange = minMaxRange.getLowerRange();
+            if (!minMaxRange.lowerUnbound()) {
+                if (!minMaxRange.isLowerInclusive()) {
+                    lowerRange = ScanUtil.nextKey(lowerRange, table, tempPtr);
+                }
+            }
+            
+            byte[] upperRange = minMaxRange.getUpperRange();
+            if (!minMaxRange.upperUnbound()) {
+                if (minMaxRange.isUpperInclusive()) {
+                    upperRange = ScanUtil.nextKey(upperRange, table, tempPtr);
+                }
+            }
+            if (minMaxRange.getLowerRange() != lowerRange || minMaxRange.getUpperRange() != upperRange) {
+                minMaxRange = KeyRange.getKeyRange(lowerRange, true, upperRange, false);
+            }
+            // If we're not salting, we can intersect this now with the scan range.
+            // Otherwise, we have to wait to do this when we chunk up the scan.
+            if (table.getBucketNum() == null) {
+                minMaxRange = minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow()));
+                scan.setStartRow(minMaxRange.getLowerRange());
+                scan.setStopRow(minMaxRange.getUpperRange());
+            }
+            this.minMaxRange = minMaxRange;
+        }
+    }
+    
+    public PhoenixConnection getConnection() {
+        return connection;
+    }
+
+    public long getCurrentTime() throws SQLException {
+        long ts = this.getResolver().getTables().get(0).getTimeStamp();
+        if (ts != QueryConstants.UNSET_TIMESTAMP) {
+            return ts;
+        }
+        if (currentTime != QueryConstants.UNSET_TIMESTAMP) {
+            return currentTime;
+        }
+        /*
+         * For an UPSERT VALUES where autocommit off, we won't hit the server until the commit.
+         * However, if the statement has a CURRENT_DATE() call as a value, we need to know the
+         * current time at execution time. In that case, we'll call MetaDataClient.updateCache
+         * purely to bind the current time based on the server time.
+         */
+        PTable table = this.getResolver().getTables().get(0).getTable();
+        MetaDataClient client = new MetaDataClient(connection);
+        currentTime = Math.abs(client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()));
+        return currentTime;
+    }
+
+    /**
+     * Get the key range derived from row value constructor usage in where clause. These are orthogonal to the ScanRanges
+     * and form a range for which each scan is intersected against.
+     */
+    public KeyRange getMinMaxRange () {
+        return minMaxRange;
+    }
+    
+    public boolean isSingleRowScan() {
+        return this.getScanRanges().isSingleRowScan() && ! (this.getScan().getFilter() instanceof FilterList);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
new file mode 100644
index 0000000..bd1f0b9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.parse.BetweenParseNode;
+import org.apache.phoenix.parse.ComparisonParseNode;
+import org.apache.phoenix.parse.LessThanOrEqualParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeRewriter;
+import org.apache.phoenix.parse.SelectStatement;
+
+
+/**
+ * 
+ * Class that creates a new select statement ensuring that a literal always occurs
+ * on the RHS (i.e. if literal found on the LHS, then the operator is reversed and
+ * the literal is put on the RHS)
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class StatementNormalizer extends ParseNodeRewriter {
+    
+    public StatementNormalizer(ColumnResolver resolver, int expectedAliasCount) {
+        super(resolver, expectedAliasCount);
+    }
+
+    /**
+     * Rewrite the select statement by switching any constants to the right hand side
+     * of the expression.
+     * @param statement the select statement
+     * @param resolver 
+     * @return new select statement
+     * @throws SQLException 
+     */
+    public static SelectStatement normalize(SelectStatement statement, ColumnResolver resolver) throws SQLException {
+        return rewrite(statement, new StatementNormalizer(resolver, statement.getSelect().size()));
+    }
+    
+    @Override
+    public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> nodes) throws SQLException {
+         if (nodes.get(0).isConstant() && !nodes.get(1).isConstant()) {
+             List<ParseNode> normNodes = Lists.newArrayListWithExpectedSize(2);
+             normNodes.add(nodes.get(1));
+             normNodes.add(nodes.get(0));
+             nodes = normNodes;
+             node = NODE_FACTORY.comparison(node.getInvertFilterOp(), nodes.get(0), nodes.get(1));
+         }
+         return super.visitLeave(node, nodes);
+    }
+    
+    @Override
+    public ParseNode visitLeave(final BetweenParseNode node, List<ParseNode> nodes) throws SQLException {
+       
+        LessThanOrEqualParseNode lhsNode =  NODE_FACTORY.lte(node.getChildren().get(1), node.getChildren().get(0));
+        LessThanOrEqualParseNode rhsNode =  NODE_FACTORY.lte(node.getChildren().get(0), node.getChildren().get(2));
+        List<ParseNode> parseNodes = Lists.newArrayListWithExpectedSize(2);
+        parseNodes.add(this.visitLeave(lhsNode, lhsNode.getChildren()));
+        parseNodes.add(this.visitLeave(rhsNode, rhsNode.getChildren()));
+        return super.visitLeave(node, parseNodes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/StatementPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/StatementPlan.java b/src/main/java/org/apache/phoenix/compile/StatementPlan.java
new file mode 100644
index 0000000..e41e86a
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/StatementPlan.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+
+
+public interface StatementPlan {
+    public final StatementPlan EMPTY_PLAN = new StatementPlan() {
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return new PhoenixParameterMetaData(0);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            return ExplainPlan.EMPTY_PLAN;
+        }
+    };
+    
+    /**
+     * Returns the ParameterMetaData for the statement
+     */
+    ParameterMetaData getParameterMetaData();
+    
+    ExplainPlan getExplainPlan() throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java b/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
new file mode 100644
index 0000000..8a7a221
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
@@ -0,0 +1,194 @@
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.FunctionExpression.OrderPreserving;
+import org.apache.phoenix.parse.CaseParseNode;
+import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.DivideParseNode;
+import org.apache.phoenix.parse.MultiplyParseNode;
+import org.apache.phoenix.parse.SubtractParseNode;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.util.SchemaUtil;
+
+/**
+ * Visitor that builds the expressions of a GROUP BY and ORDER BY clause. While traversing
+ * the parse node tree, the visitor also determines if the natural key order of the scan
+ * will match the order of the expressions. For GROUP BY, if order is preserved we can use
+ * an optimization during server-side aggregation to do the aggregation on-the-fly versus
+ * keeping track of each distinct group. We can only do this optimization if all the rows
+ * for each group will be contiguous. For ORDER BY, we can drop the ORDER BY statement if
+ * the order is preserved.
+ * 
+ */
+public class TrackOrderPreservingExpressionCompiler  extends ExpressionCompiler {
+    public enum Ordering {ORDERED, UNORDERED};
+    
+    private final List<Entry> entries;
+    private final Ordering ordering;
+    private final int positionOffset;
+    private OrderPreserving orderPreserving = OrderPreserving.YES;
+    private ColumnRef columnRef;
+    private boolean isOrderPreserving = true;
+    
+    TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy groupBy, int expectedEntrySize, Ordering ordering) {
+        super(context, groupBy);
+        positionOffset =  context.getResolver().getTables().get(0).getTable().getBucketNum() == null ? 0 : 1;
+        entries = Lists.newArrayListWithExpectedSize(expectedEntrySize);
+        this.ordering = ordering;
+    }
+    
+
+    public boolean isOrderPreserving() {
+        if (!isOrderPreserving) {
+            return false;
+        }
+        if (ordering == Ordering.UNORDERED) {
+            // Sort by position
+            Collections.sort(entries, new Comparator<Entry>() {
+                @Override
+                public int compare(Entry o1, Entry o2) {
+                    return o1.getPkPosition()-o2.getPkPosition();
+                }
+            });
+        }
+        // Determine if there are any gaps in the PK columns (in which case we don't need
+        // to sort in the coprocessor because the keys will already naturally be in sorted
+        // order.
+        int prevPos = positionOffset - 1;
+        OrderPreserving prevOrderPreserving = OrderPreserving.YES;
+        for (int i = 0; i < entries.size() && isOrderPreserving; i++) {
+            Entry entry = entries.get(i);
+            int pos = entry.getPkPosition();
+            isOrderPreserving &= (entry.getOrderPreserving() != OrderPreserving.NO) && (pos == prevPos || ((pos - 1 == prevPos) && (prevOrderPreserving == OrderPreserving.YES)));
+            prevPos = pos;
+            prevOrderPreserving = entries.get(i).getOrderPreserving();
+        }
+        return isOrderPreserving;
+    }
+    
+    @Override
+    protected Expression addFunction(FunctionExpression func) {
+        // Keep the minimum value between this function and the current value,
+        // so that we never increase OrderPreserving from NO or YES_IF_LAST.
+        orderPreserving = OrderPreserving.values()[Math.min(orderPreserving.ordinal(), func.preservesOrder().ordinal())];
+        return super.addFunction(func);
+    }
+
+    @Override
+    public boolean visitEnter(CaseParseNode node) throws SQLException {
+        orderPreserving = OrderPreserving.NO;
+        return super.visitEnter(node);
+    }
+    
+    @Override
+    public boolean visitEnter(DivideParseNode node) throws SQLException {
+        // A divide expression may not preserve row order.
+        // For example: GROUP BY 1/x
+        orderPreserving = OrderPreserving.NO;
+        return super.visitEnter(node);
+    }
+
+    @Override
+    public boolean visitEnter(SubtractParseNode node) throws SQLException {
+        // A subtract expression may not preserve row order.
+        // For example: GROUP BY 10 - x
+        orderPreserving = OrderPreserving.NO;
+        return super.visitEnter(node);
+    }
+
+    @Override
+    public boolean visitEnter(MultiplyParseNode node) throws SQLException {
+        // A multiply expression may not preserve row order.
+        // For example: GROUP BY -1 * x
+        orderPreserving = OrderPreserving.NO;
+        return super.visitEnter(node);
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        columnRef = null;
+        orderPreserving = OrderPreserving.YES;
+    }
+    
+    @Override
+    protected ColumnRef resolveColumn(ColumnParseNode node) throws SQLException {
+        ColumnRef ref = super.resolveColumn(node);
+        // If we encounter any non PK column, then we can't aggregate on-the-fly
+        // because the distinct groups have no correlation to the KV column value
+        if (!SchemaUtil.isPKColumn(ref.getColumn())) {
+            orderPreserving = OrderPreserving.NO;
+        }
+        
+        if (columnRef == null) {
+            columnRef = ref;
+        } else if (!columnRef.equals(ref)) {
+            // If we encounter more than one column reference in an expression,
+            // we can't assume the result of the expression will be key ordered.
+            // For example GROUP BY a * b
+            orderPreserving = OrderPreserving.NO;
+        }
+        return ref;
+    }
+
+    public boolean addEntry(Expression expression) {
+        if (expression instanceof LiteralExpression) {
+            return false;
+        }
+        isOrderPreserving &= (orderPreserving != OrderPreserving.NO);
+        entries.add(new Entry(expression, columnRef, orderPreserving));
+        return true;
+    }
+    
+    public boolean addEntry(Expression expression, ColumnModifier modifier) {
+        // If the expression is sorted in a different order than the specified sort order
+        // then the expressions are not order preserving.
+        if (!Objects.equal(expression.getColumnModifier(), modifier)) {
+            orderPreserving = OrderPreserving.NO;
+        }
+        return addEntry(expression);
+    }
+    
+    public List<Entry> getEntries() {
+        return entries;
+    }
+
+    public static class Entry {
+        private final Expression expression;
+        private final ColumnRef columnRef;
+        private final OrderPreserving orderPreserving;
+        
+        private Entry(Expression expression, ColumnRef columnRef, OrderPreserving orderPreserving) {
+            this.expression = expression;
+            this.columnRef = columnRef;
+            this.orderPreserving = orderPreserving;
+        }
+
+        public Expression getExpression() {
+            return expression;
+        }
+
+        public int getPkPosition() {
+            return columnRef.getPKSlotPosition();
+        }
+
+        public int getColumnPosition() {
+            return columnRef.getColumnPosition();
+        }
+
+        public OrderPreserving getOrderPreserving() {
+            return orderPreserving;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
new file mode 100644
index 0000000..d5dc0f8
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -0,0 +1,630 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.optimize.QueryOptimizer;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.UpsertStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.query.Scanner;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.TypeMismatchException;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class UpsertCompiler {
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) {
+        Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
+        byte[][] pkValues = new byte[table.getPKColumns().size()][];
+        // If the table uses salting, the first byte is the salting byte, set to an empty array
+        // here and we will fill in the byte later in PRowImpl.
+        if (table.getBucketNum() != null) {
+            pkValues[0] = new byte[] {0};
+        }
+        for (int i = 0; i < values.length; i++) {
+            byte[] value = values[i];
+            PColumn column = table.getColumns().get(columnIndexes[i]);
+            if (SchemaUtil.isPKColumn(column)) {
+                pkValues[pkSlotIndex[i]] = value;
+            } else {
+                columnValues.put(column, value);
+            }
+        }
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+        table.newKey(ptr, pkValues);
+        mutation.put(ptr, columnValues);
+    }
+
+    private static MutationState upsertSelect(PhoenixStatement statement, 
+            TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes,
+            int[] pkSlotIndexes) throws SQLException {
+        try {
+            PhoenixConnection connection = statement.getConnection();
+            ConnectionQueryServices services = connection.getQueryServices();
+            int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+            boolean isAutoCommit = connection.getAutoCommit();
+            byte[][] values = new byte[columnIndexes.length][];
+            int rowCount = 0;
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+            PTable table = tableRef.getTable();
+            ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
+            while (rs.next()) {
+                for (int i = 0; i < values.length; i++) {
+                    PColumn column = table.getColumns().get(columnIndexes[i]);
+                    byte[] byteValue = rs.getBytes(i+1);
+                    Object value = rs.getObject(i+1);
+                    int rsPrecision = rs.getMetaData().getPrecision(i+1);
+                    Integer precision = rsPrecision == 0 ? null : rsPrecision;
+                    int rsScale = rs.getMetaData().getScale(i+1);
+                    Integer scale = rsScale == 0 ? null : rsScale;
+                    // If ColumnModifier from expression in SELECT doesn't match the
+                    // column being projected into then invert the bits.
+                    if (column.getColumnModifier() == ColumnModifier.SORT_DESC) {
+                        byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
+                        byteValue = ColumnModifier.SORT_DESC.apply(byteValue, 0, tempByteValue, 0, byteValue.length);
+                    }
+                    // We are guaranteed that the two column will have compatible types,
+                    // as we checked that before.
+                    if (!column.getDataType().isSizeCompatible(column.getDataType(),
+                            value, byteValue,
+                            precision, column.getMaxLength(), 
+                            scale, column.getScale())) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE)
+                            .setColumnName(column.getName().getString()).build().buildException();
+                    }
+                    values[i] = column.getDataType().coerceBytes(byteValue, value, column.getDataType(),
+                            precision, scale, column.getMaxLength(), column.getScale());
+                }
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation);
+                rowCount++;
+                // Commit a batch if auto commit is true and we're at our batch size
+                if (isAutoCommit && rowCount % batchSize == 0) {
+                    MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection);
+                    connection.getMutationState().join(state);
+                    connection.commit();
+                    mutation.clear();
+                }
+            }
+            // If auto commit is true, this last batch will be committed upon return
+            return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
+        } finally {
+            iterator.close();
+        }
+    }
+
+    private static class UpsertingParallelIteratorFactory extends MutatingParallelIteratorFactory {
+        private RowProjector projector;
+        private int[] columnIndexes;
+        private int[] pkSlotIndexes;
+
+        private UpsertingParallelIteratorFactory (PhoenixConnection connection, TableRef tableRef) {
+            super(connection, tableRef);
+        }
+
+        @Override
+        protected MutationState mutate(PhoenixConnection connection, ResultIterator iterator) throws SQLException {
+            PhoenixStatement statement = new PhoenixStatement(connection);
+            return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+        }
+        
+        public void setRowProjector(RowProjector projector) {
+            this.projector = projector;
+        }
+        public void setColumnIndexes(int[] columnIndexes) {
+            this.columnIndexes = columnIndexes;
+        }
+        public void setPkSlotIndexes(int[] pkSlotIndexes) {
+            this.pkSlotIndexes = pkSlotIndexes;
+        }
+    }
+    
+    private final PhoenixStatement statement;
+    
+    public UpsertCompiler(PhoenixStatement statement) {
+        this.statement = statement;
+    }
+    
+    public MutationPlan compile(UpsertStatement upsert) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        ConnectionQueryServices services = connection.getQueryServices();
+        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        final ColumnResolver resolver = FromCompiler.getResolver(upsert, connection);
+        final TableRef tableRef = resolver.getTables().get(0);
+        PTable table = tableRef.getTable();
+        if (table.getType() == PTableType.VIEW) {
+            throw new ReadOnlyTableException("Mutations not allowed for a view (" + tableRef + ")");
+        }
+        // Setup array of column indexes parallel to values that are going to be set
+        List<ColumnName> columnNodes = upsert.getColumns();
+        List<PColumn> allColumns = table.getColumns();
+
+        int[] columnIndexesToBe;
+        int[] pkSlotIndexesToBe;
+        List<PColumn> targetColumns;
+        boolean isSalted = table.getBucketNum() != null;
+        int posOffset = isSalted ? 1 : 0;
+        // Allow full row upsert if no columns or only dynamic one are specified and values count match
+        int numColsInUpsert = columnNodes.size();
+        int numColsInTable = table.getColumns().size();
+        if (columnNodes.isEmpty() || numColsInUpsert == upsert.getTable().getDynamicColumns().size()) {
+            columnIndexesToBe = new int[allColumns.size() - posOffset];
+            pkSlotIndexesToBe = new int[columnIndexesToBe.length];
+            targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
+            targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
+            for (int i = posOffset, j = posOffset; i < allColumns.size(); i++) {
+                PColumn column = allColumns.get(i);
+                columnIndexesToBe[i-posOffset] = i;
+                targetColumns.set(i-posOffset, column);
+                if (SchemaUtil.isPKColumn(column)) {
+                    pkSlotIndexesToBe[i-posOffset] = j++;
+                }
+            }
+        } else {
+            columnIndexesToBe = new int[numColsInUpsert];
+            pkSlotIndexesToBe = new int[columnIndexesToBe.length];
+            targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
+            targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
+            Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
+            Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
+            BitSet pkColumnsSet = new BitSet(table.getPKColumns().size());
+            for (int i =0; i < numColsInUpsert; i++) {
+                ColumnName colName = columnNodes.get(i);
+                ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
+                columnIndexesToBe[i] = ref.getColumnPosition();
+                targetColumns.set(i, ref.getColumn());
+                if (SchemaUtil.isPKColumn(ref.getColumn())) {
+                    pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition());
+                }
+            }
+            int i = posOffset;
+            for ( ; i < table.getPKColumns().size(); i++) {
+                PColumn pkCol = table.getPKColumns().get(i);
+                if (!pkColumnsSet.get(i)) {
+                    if (!pkCol.isNullable()) {
+                        throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null");
+                    }
+                }
+            }
+        }
+        
+        List<ParseNode> valueNodes = upsert.getValues();
+        QueryPlan plan = null;
+        RowProjector rowProjectorToBe = null;
+        int nValuesToSet;
+        boolean runOnServer = false;
+        UpsertingParallelIteratorFactory upsertParallelIteratorFactoryToBe = null;
+        final boolean isAutoCommit = connection.getAutoCommit();
+        if (valueNodes == null) {
+            SelectStatement select = upsert.getSelect();
+            assert(select != null);
+            TableRef selectTableRef = FromCompiler.getResolver(select, connection).getTables().get(0);
+            boolean sameTable = tableRef.equals(selectTableRef);
+            /* We can run the upsert in a coprocessor if:
+             * 1) the into table matches from table
+             * 2) the select query isn't doing aggregation
+             * 3) autoCommit is on
+             * 4) the table is not immutable, as the client is the one that figures out the additional
+             *    puts for index tables.
+             * 5) no limit clause
+             * Otherwise, run the query to pull the data from the server
+             * and populate the MutationState (upto a limit).
+            */            
+            runOnServer = sameTable && isAutoCommit && !table.isImmutableRows() && !select.isAggregate() && !select.isDistinct() && select.getLimit() == null && table.getBucketNum() == null;
+            ParallelIteratorFactory parallelIteratorFactory;
+            // TODO: once MutationState is thread safe, then when auto commit is off, we can still run in parallel
+            if (select.isAggregate() || select.isDistinct() || select.getLimit() != null) {
+                parallelIteratorFactory = null;
+            } else {
+                // We can pipeline the upsert select instead of spooling everything to disk first,
+                // if we don't have any post processing that's required.
+                parallelIteratorFactory = upsertParallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRef);
+            }
+            // If we may be able to run on the server, add a hint that favors using the data table
+            // if all else is equal.
+            // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
+            // as this would disallow running on the server. We currently use the row projector we
+            // get back to figure this out.
+            HintNode hint = upsert.getHint();
+            if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+                hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
+            }
+            select = SelectStatement.create(select, hint);
+            // Pass scan through if same table in upsert and select so that projection is computed correctly
+            // Use optimizer to choose the best plan 
+            plan = new QueryOptimizer(services).optimize(select, statement, targetColumns, parallelIteratorFactory);
+            runOnServer &= plan.getTableRef().equals(tableRef);
+            rowProjectorToBe = plan.getProjector();
+            nValuesToSet = rowProjectorToBe.getColumnCount();
+            // Cannot auto commit if doing aggregation or topN or salted
+            // Salted causes problems because the row may end up living on a different region
+        } else {
+            nValuesToSet = valueNodes.size();
+        }
+        final RowProjector projector = rowProjectorToBe;
+        final UpsertingParallelIteratorFactory upsertParallelIteratorFactory = upsertParallelIteratorFactoryToBe;
+        final QueryPlan queryPlan = plan;
+        // Resize down to allow a subset of columns to be specifiable
+        if (columnNodes.isEmpty()) {
+            columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
+            pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
+        }
+        
+        if (nValuesToSet != columnIndexesToBe.length) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
+                .setMessage("Numbers of columns: " + columnIndexesToBe.length + ". Number of values: " + nValuesToSet)
+                .build().buildException();
+        }
+        
+        final int[] columnIndexes = columnIndexesToBe;
+        final int[] pkSlotIndexes = pkSlotIndexesToBe;
+        
+        // TODO: break this up into multiple functions
+        ////////////////////////////////////////////////////////////////////
+        // UPSERT SELECT
+        /////////////////////////////////////////////////////////////////////
+        if (valueNodes == null) {
+            ////////////////////////////////////////////////////////////////////
+            // UPSERT SELECT run server-side (maybe)
+            /////////////////////////////////////////////////////////////////////
+            if (runOnServer) {
+                // At most this array will grow bigger by the number of PK columns
+                int[] allColumnsIndexes = Arrays.copyOf(columnIndexes, columnIndexes.length + nValuesToSet);
+                int[] reverseColumnIndexes = new int[table.getColumns().size()];
+                List<Expression> projectedExpressions = Lists.newArrayListWithExpectedSize(reverseColumnIndexes.length);
+                Arrays.fill(reverseColumnIndexes, -1);
+                for (int i =0; i < nValuesToSet; i++) {
+                    projectedExpressions.add(projector.getColumnProjector(i).getExpression());
+                    reverseColumnIndexes[columnIndexes[i]] = i;
+                }
+                /*
+                 * Order projected columns and projected expressions with PK columns
+                 * leading order by slot position
+                 */
+                int offset = table.getBucketNum() == null ? 0 : 1;
+                for (int i = 0; i < table.getPKColumns().size() - offset; i++) {
+                    PColumn column = table.getPKColumns().get(i + offset);
+                    int pos = reverseColumnIndexes[column.getPosition()];
+                    if (pos == -1) {
+                        // Last PK column may be fixed width and nullable
+                        // We don't want to insert a null expression b/c
+                        // it's not valid to set a fixed width type to null.
+                        if (column.getDataType().isFixedWidth()) {
+                            continue;
+                        }
+                        // Add literal null for missing PK columns
+                        pos = projectedExpressions.size();
+                        Expression literalNull = LiteralExpression.newConstant(null, column.getDataType());
+                        projectedExpressions.add(literalNull);
+                        allColumnsIndexes[pos] = column.getPosition();
+                    } 
+                    // Swap select expression at pos with i
+                    Collections.swap(projectedExpressions, i, pos);
+                    // Swap column indexes and reverse column indexes too
+                    int tempPos = allColumnsIndexes[i];
+                    allColumnsIndexes[i] = allColumnsIndexes[pos];
+                    allColumnsIndexes[pos] = tempPos;
+                    reverseColumnIndexes[tempPos] = reverseColumnIndexes[i];
+                    reverseColumnIndexes[i] = i;
+                }
+                // If any pk slots are changing, be conservative and don't run this server side.
+                // If the row ends up living in a different region, we'll get an error otherwise.
+                for (int i = 0; i < table.getPKColumns().size(); i++) {
+                    PColumn column = table.getPKColumns().get(i);
+                    Expression source = projectedExpressions.get(i);
+                    if (source == null || !source.equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) {
+                        // TODO: we could check the region boundaries to see if the pk will still be in it.
+                        runOnServer = false; // bail on running server side, since PK may be changing
+                        break;
+                    }
+                }
+                
+                ////////////////////////////////////////////////////////////////////
+                // UPSERT SELECT run server-side
+                /////////////////////////////////////////////////////////////////////
+                if (runOnServer) {
+                    // Iterate through columns being projected
+                    List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size());
+                    for (int i = 0; i < projectedExpressions.size(); i++) {
+                        // Must make new column if position has changed
+                        PColumn column = allColumns.get(allColumnsIndexes[i]);
+                        projectedColumns.add(column.getPosition() == i ? column : new PColumnImpl(column, i));
+                    }
+                    // Build table from projectedColumns
+                    PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
+                    
+                    SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
+                    final RowProjector aggProjector = ProjectionCompiler.compile(queryPlan.getContext(), select, GroupBy.EMPTY_GROUP_BY);
+                    /*
+                     * Transfer over PTable representing subset of columns selected, but all PK columns.
+                     * Move columns setting PK first in pkSlot order, adding LiteralExpression of null for any missing ones.
+                     * Transfer over List<Expression> for projection.
+                     * In region scan, evaluate expressions in order, collecting first n columns for PK and collection non PK in mutation Map
+                     * Create the PRow and get the mutations, adding them to the batch
+                     */
+                    final StatementContext context = queryPlan.getContext();
+                    final Scan scan = context.getScan();
+                    scan.setAttribute(UngroupedAggregateRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable));
+                    scan.setAttribute(UngroupedAggregateRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
+                    // Ignore order by - it has no impact
+                    final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+                    return new MutationPlan() {
+    
+                        @Override
+                        public PhoenixConnection getConnection() {
+                            return connection;
+                        }
+    
+                        @Override
+                        public ParameterMetaData getParameterMetaData() {
+                            return queryPlan.getContext().getBindManager().getParameterMetaData();
+                        }
+    
+                        @Override
+                        public MutationState execute() throws SQLException {
+                            ImmutableBytesWritable ptr = context.getTempPtr();
+                            tableRef.getTable().getIndexMaintainers(ptr);
+                            ServerCache cache = null;
+                            try {
+                                if (ptr.getLength() > 0) {
+                                    IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                    byte[] uuidValue = cache.getId();
+                                    scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                }
+                                Scanner scanner = aggPlan.getScanner();
+                                ResultIterator iterator = scanner.iterator();
+                                try {
+                                    Tuple row = iterator.next();
+                                    final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr);
+                                    return new MutationState(maxSize, connection) {
+                                        @Override
+                                        public long getUpdateCount() {
+                                            return mutationCount;
+                                        }
+                                    };
+                                } finally {
+                                    iterator.close();
+                                }
+                            } finally {
+                                if (cache != null) {
+                                    cache.close();
+                                }
+                            }
+                        }
+    
+                        @Override
+                        public ExplainPlan getExplainPlan() throws SQLException {
+                            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+                            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                            planSteps.add("UPSERT ROWS");
+                            planSteps.addAll(queryPlanSteps);
+                            return new ExplainPlan(planSteps);
+                        }
+                    };
+                }
+            }
+
+            ////////////////////////////////////////////////////////////////////
+            // UPSERT SELECT run client-side
+            /////////////////////////////////////////////////////////////////////
+            return new MutationPlan() {
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return connection;
+                }
+                
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return queryPlan.getContext().getBindManager().getParameterMetaData();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    Scanner scanner = queryPlan.getScanner();
+                    ResultIterator iterator = scanner.iterator();
+                    if (upsertParallelIteratorFactory == null) {
+                        return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+                    }
+                    upsertParallelIteratorFactory.setRowProjector(projector);
+                    upsertParallelIteratorFactory.setColumnIndexes(columnIndexes);
+                    upsertParallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                    Tuple tuple;
+                    long totalRowCount = 0;
+                    while ((tuple=iterator.next()) != null) {// Runs query
+                        KeyValue kv = tuple.getValue(0);
+                        totalRowCount += PDataType.LONG.getCodec().decodeLong(kv.getBuffer(), kv.getValueOffset(), null);
+                    }
+                    // Return total number of rows that have been updated. In the case of auto commit being off
+                    // the mutations will all be in the mutation state of the current connection.
+                    return new MutationState(maxSize, statement.getConnection(), totalRowCount);
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
+                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                    planSteps.add("UPSERT SELECT");
+                    planSteps.addAll(queryPlanSteps);
+                    return new ExplainPlan(planSteps);
+                }
+                
+            };
+        }
+
+            
+        ////////////////////////////////////////////////////////////////////
+        // UPSERT VALUES
+        /////////////////////////////////////////////////////////////////////
+        if (nValuesToSet > numColsInTable) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH)
+                .setMessage("Numbers of columns in table: " + numColsInTable + ". Number of values: " + nValuesToSet)
+                .build().buildException();
+        }
+        
+        int nodeIndex = 0;
+        // Allocate array based on size of all columns in table,
+        // since some values may not be set (if they're nullable).
+        final StatementContext context = new StatementContext(upsert, connection, resolver, statement.getParameters(), new Scan());
+        UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context);
+        final byte[][] values = new byte[nValuesToSet][];
+        for (ParseNode valueNode : valueNodes) {
+            if (!valueNode.isConstant()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException();
+            }
+            PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+            expressionBuilder.setColumn(column);
+            LiteralExpression literalExpression = (LiteralExpression)valueNode.accept(expressionBuilder);
+            byte[] byteValue = literalExpression.getBytes();
+            if (literalExpression.getDataType() != null) {
+                // If ColumnModifier from expression in SELECT doesn't match the
+                // column being projected into then invert the bits.
+                if (literalExpression.getColumnModifier() != column.getColumnModifier()) {
+                    byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
+                    byteValue = ColumnModifier.SORT_DESC.apply(byteValue, 0, tempByteValue, 0, byteValue.length);
+                }
+                if (!literalExpression.getDataType().isCoercibleTo(column.getDataType(), literalExpression.getValue())) { 
+                    throw new TypeMismatchException(
+                        literalExpression.getDataType(), column.getDataType(), "expression: "
+                                + literalExpression.toString() + " in column " + column);
+                }
+                if (!column.getDataType().isSizeCompatible(literalExpression.getDataType(),
+                        literalExpression.getValue(), byteValue, literalExpression.getMaxLength(),
+                        column.getMaxLength(), literalExpression.getScale(), column.getScale())) { 
+                    throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE).setColumnName(column.getName().getString())
+                        .setMessage("value=" + literalExpression.toString()).build().buildException();
+                }
+            }
+            byteValue = column.getDataType().coerceBytes(byteValue, literalExpression.getValue(),
+                    literalExpression.getDataType(), literalExpression.getMaxLength(), literalExpression.getScale(),
+                    column.getMaxLength(), column.getScale());
+            values[nodeIndex] = byteValue;
+            nodeIndex++;
+        }
+        return new MutationPlan() {
+
+            @Override
+            public PhoenixConnection getConnection() {
+                return connection;
+            }
+
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public MutationState execute() {
+                Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
+                setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation);
+                return new MutationState(tableRef, mutation, 0, maxSize, connection);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("PUT SINGLE ROW"));
+            }
+
+        };
+    }
+    
+    private static final class UpsertValuesCompiler extends ExpressionCompiler {
+        private PColumn column;
+        
+        private UpsertValuesCompiler(StatementContext context) {
+            super(context);
+        }
+
+        public void setColumn(PColumn column) {
+            this.column = column;
+        }
+        
+        @Override
+        public Expression visit(BindParseNode node) throws SQLException {
+            if (isTopLevel()) {
+                context.getBindManager().addParamMetaData(node, column);
+                Object value = context.getBindManager().getBindValue(node);
+                return LiteralExpression.newConstant(value, column.getDataType(), column.getColumnModifier());
+            }
+            return super.visit(node);
+        }    
+        
+        @Override
+        public Expression visit(LiteralParseNode node) throws SQLException {
+            if (isTopLevel()) {
+                return LiteralExpression.newConstant(node.getValue(), column.getDataType(), column.getColumnModifier());
+            }
+            return super.visit(node);
+        }
+    }
+}


Mime
View raw message