phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [31/51] [partial] Initial commit of master branch from github
Date Mon, 27 Jan 2014 22:15:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java
new file mode 100644
index 0000000..67d7e99
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java
@@ -0,0 +1,87 @@
+package org.apache.phoenix.expression;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A container for a column that appears in ORDER BY clause.
+ */
+public class OrderByExpression implements Writable {
+    private Expression expression;
+    private boolean isNullsLast;
+    private boolean isAscending;
+    
+    public OrderByExpression() {
+    }
+    
+    public OrderByExpression(Expression expression, boolean isNullsLast, boolean isAcending) {
+        checkNotNull(expression);
+        this.expression = expression;
+        this.isNullsLast = isNullsLast;
+        this.isAscending = isAcending;
+    }
+
+    public Expression getExpression() {
+        return expression;
+    }
+    
+    public boolean isNullsLast() {
+        return isNullsLast;
+    }
+    
+    public boolean isAscending() {
+        return isAscending;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o != null && this.getClass() == o.getClass()) {
+            OrderByExpression that = (OrderByExpression)o;
+            return isNullsLast == that.isNullsLast
+                && isAscending == that.isAscending
+                && expression.equals(that.expression);
+        }
+        return false;
+    }
+    
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (isNullsLast ? 0 : 1);
+        result = prime * result + (isAscending ? 0 : 1);
+        result = prime * result + expression.hashCode();
+        return result;
+    }
+    
+    @Override
+    public String toString() {
+        return this.getExpression() + (isAscending ? "" : " DESC") + (isNullsLast ? " NULLS LAST" : "");
+    }
+    
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        this.isNullsLast = input.readBoolean();
+        this.isAscending = input.readBoolean();
+        expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+        expression.readFields(input);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.writeBoolean(isNullsLast);
+        output.writeBoolean(isAscending);
+        WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+        expression.write(output);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
new file mode 100644
index 0000000..2262882
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class ProjectedColumnExpression extends ColumnExpression {
+	private KeyValueSchema schema;
+	ValueBitSet bitSet;
+	private int position;
+	private byte[] name; // for display purpose only
+	
+	public ProjectedColumnExpression() {
+	}
+
+	public ProjectedColumnExpression(PColumn column, PTable table) {
+		super(column);
+		this.schema = buildSchema(table);
+		this.bitSet = ValueBitSet.newInstance(schema);
+		this.position = column.getPosition() - table.getPKColumns().size();
+		this.name = column.getName().getBytes();
+	}
+    
+    private static KeyValueSchema buildSchema(PTable table) {
+    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        for (PColumn column : table.getColumns()) {
+        	if (!SchemaUtil.isPKColumn(column)) {
+        		builder.addField(column);
+        	}
+        }
+        return builder.build();
+    }
+    
+    public KeyValueSchema getSchema() {
+    	return schema;
+    }
+    
+    public int getPosition() {
+    	return position;
+    }
+    
+    public byte[] getName() {
+    	return name;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + schema.hashCode();
+        result = prime * result + position;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        ProjectedColumnExpression other = (ProjectedColumnExpression)obj;
+        if (!schema.equals(other.schema)) return false;
+        if (position != other.position) return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return Bytes.toString(name);
+    }
+	
+	@Override
+	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        try {
+            ScanProjector.decodeProjectedValue(tuple, ptr);
+            int maxOffset = ptr.getOffset() + ptr.getLength();
+            bitSet.clear();
+            bitSet.or(ptr);
+            schema.iterator(ptr, position, bitSet);
+            Boolean hasValue = schema.next(ptr, position, maxOffset, bitSet);
+            if (hasValue == null || !hasValue.booleanValue())
+                return false;
+        } catch (IOException e) {
+            return false;
+        }
+		
+		return true;
+	}
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        schema = new KeyValueSchema();
+        schema.readFields(input);
+        bitSet = ValueBitSet.newInstance(schema);
+        position = input.readInt();
+        name = Bytes.readByteArray(input);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        schema.write(output);
+        output.writeInt(position);
+        Bytes.writeByteArray(output, name);
+    }
+
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
new file mode 100644
index 0000000..383ef2e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyColumnExpression.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+
+
+/**
+ * 
+ * Class to access a value stored in the row key
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class RowKeyColumnExpression  extends ColumnExpression {
+    private PDataType fromType;
+    private RowKeyValueAccessor accessor;
+    protected final String name;
+    
+    public RowKeyColumnExpression() {
+        name = null; // Only on client
+    }
+    
+    private RowKeyColumnExpression(PDatum datum, RowKeyValueAccessor accessor, PDataType fromType, String name) {
+        super(datum);
+        this.accessor = accessor;
+        this.fromType = fromType;
+        this.name = name;
+    }
+    
+    public RowKeyColumnExpression(PDatum datum, RowKeyValueAccessor accessor) {
+        this(datum, accessor, datum.getDataType(), datum.toString());
+    }
+    
+    public RowKeyColumnExpression(PDatum datum, RowKeyValueAccessor accessor, String name) {
+        this(datum, accessor, datum.getDataType(), name);
+    }
+    
+    public RowKeyColumnExpression(PDatum datum, RowKeyValueAccessor accessor, PDataType fromType) {
+        this(datum, accessor, fromType, datum.toString());
+    }
+    
+    public int getPosition() {
+        return accessor.getIndex();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = super.hashCode();
+        result = prime * result + ((accessor == null) ? 0 : accessor.hashCode());
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return name == null ? "PK[" + accessor.getIndex() + "]" : name;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (!super.equals(obj)) return false;
+        if (getClass() != obj.getClass()) return false;
+        RowKeyColumnExpression other = (RowKeyColumnExpression)obj;
+        return accessor.equals(other.accessor);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        tuple.getKey(ptr);
+        int offset = accessor.getOffset(ptr.get(), ptr.getOffset());
+        // Null is represented in the last expression of a multi-part key 
+        // by the bytes not being present.
+        int maxOffset = ptr.getOffset() + ptr.getLength();
+        if (offset < maxOffset) {
+            byte[] buffer = ptr.get();
+            int fixedByteSize = -1;
+            // FIXME: fixedByteSize <= maxByteSize ? fixedByteSize : 0 required because HBase passes bogus keys to filter to position scan (HBASE-6562)
+            if (fromType.isFixedWidth()) {
+                fixedByteSize = getByteSize();
+                fixedByteSize = fixedByteSize <= maxOffset ? fixedByteSize : 0;
+            }
+            int length = fixedByteSize >= 0 ? fixedByteSize  : accessor.getLength(buffer, offset, maxOffset);
+            // In the middle of the key, an empty variable length byte array represents null
+            if (length > 0) {
+                /*
+                if (type == fromType) {
+                    ptr.set(buffer,offset,length);
+                } else {
+                    ptr.set(type.toBytes(type.toObject(buffer, offset, length, fromType)));
+                }
+                */
+                ptr.set(buffer,offset,length);
+                type.coerceBytes(ptr, fromType, getColumnModifier(), getColumnModifier());
+            } else {
+                ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        accessor = new RowKeyValueAccessor();
+        accessor.readFields(input);
+        fromType = type; // fromType only needed on client side
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        accessor.write(output);
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyExpression.java
new file mode 100644
index 0000000..c311932
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowKeyExpression.java
@@ -0,0 +1,25 @@
+package org.apache.phoenix.expression;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class RowKeyExpression extends BaseTerminalExpression {
+    public static final RowKeyExpression INSTANCE = new RowKeyExpression();
+    
+    private RowKeyExpression() {
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        tuple.getKey(ptr);
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
new file mode 100644
index 0000000..3e410cb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/RowValueConstructorExpression.java
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for row value constructor (a,b,c) expression.
+ * 
+ * @author samarth.jain
+ * @since 0.1
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.expression.function.CeilDecimalExpression;
+import org.apache.phoenix.expression.function.CeilTimestampExpression;
+import org.apache.phoenix.expression.function.FloorDateExpression;
+import org.apache.phoenix.expression.function.FloorDecimalExpression;
+import org.apache.phoenix.expression.function.TimeUnit;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.TypeMismatchException;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+public class RowValueConstructorExpression extends BaseCompoundExpression {
+    
+    private ImmutableBytesWritable ptrs[];
+    private ImmutableBytesWritable literalExprPtr;
+    private int counter;
+    private int estimatedByteSize;
+    
+    public static interface ExpressionComparabilityWrapper {
+        public Expression wrap(Expression lhs, Expression rhs) throws SQLException;
+    }
+    /*
+     * Used to coerce the RHS to the expected type based on the LHS. In some circumstances,
+     * we may need to round the value up or down. For example:
+     * WHERE (a,b) < (2.4, 'foo')
+     * We take the ceiling of 2.4 to make it 3 if a is an INTEGER to prevent needing to coerce
+     * every time during evaluation.
+     */
+    private static ExpressionComparabilityWrapper[] WRAPPERS = new ExpressionComparabilityWrapper[CompareOp.values().length];
+    static {
+        WRAPPERS[CompareOp.LESS.ordinal()] = new ExpressionComparabilityWrapper() {
+
+            @Override
+            public Expression wrap(Expression lhs, Expression rhs) throws SQLException {
+                Expression e = rhs;
+                PDataType rhsType = rhs.getDataType();
+                PDataType lhsType = lhs.getDataType();
+                if (rhsType == PDataType.DECIMAL && lhsType != PDataType.DECIMAL) {
+                    e = FloorDecimalExpression.create(rhs);
+                } else if ((rhsType == PDataType.TIMESTAMP || rhsType == PDataType.UNSIGNED_TIMESTAMP)  && (lhsType != PDataType.TIMESTAMP && lhsType != PDataType.UNSIGNED_TIMESTAMP)) {
+                    e = FloorDateExpression.create(rhs, TimeUnit.MILLISECOND);
+                }
+                e = new CoerceExpression(e, lhsType, lhs.getColumnModifier(), lhs.getByteSize());
+                return e;
+            }
+            
+        };
+        WRAPPERS[CompareOp.LESS_OR_EQUAL.ordinal()] = WRAPPERS[CompareOp.LESS.ordinal()];
+        
+        WRAPPERS[CompareOp.GREATER.ordinal()] = new ExpressionComparabilityWrapper() {
+
+            @Override
+            public Expression wrap(Expression lhs, Expression rhs) throws SQLException {
+                Expression e = rhs;
+                PDataType rhsType = rhs.getDataType();
+                PDataType lhsType = lhs.getDataType();
+                if (rhsType == PDataType.DECIMAL && lhsType != PDataType.DECIMAL) {
+                    e = CeilDecimalExpression.create(rhs);
+                } else if ((rhsType == PDataType.TIMESTAMP || rhsType == PDataType.UNSIGNED_TIMESTAMP)  && (lhsType != PDataType.TIMESTAMP && lhsType != PDataType.UNSIGNED_TIMESTAMP)) {
+                    e = CeilTimestampExpression.create(rhs);
+                }
+                e = new CoerceExpression(e, lhsType, lhs.getColumnModifier(), lhs.getByteSize());
+                return e;
+            }
+            
+        };
+        WRAPPERS[CompareOp.GREATER_OR_EQUAL.ordinal()] = WRAPPERS[CompareOp.GREATER.ordinal()];
+    }
+    
+    private static ExpressionComparabilityWrapper getWrapper(CompareOp op) {
+        ExpressionComparabilityWrapper wrapper = WRAPPERS[op.ordinal()];
+        if (wrapper == null) {
+            throw new IllegalStateException("Unexpected compare op of " + op + " for row value constructor");
+        }
+        return wrapper;
+    }
+    
+    /**
+     * Recursively coerce the RHS to match the LHS type, throwing if the types are incompatible. The
+     * recursion occurs when the RHS or LHS is a row value constructor.
+     * TODO: this no longer needs to be recursive, as we flatten out rvc when we normalize the statement.
+     * @param lhs left hand side expression
+     * @param rhs right hand side expression
+     * @param op operator being used to compare the expressions, which can affect rounding we may need to do.
+     * @return the newly coerced expression
+     * @throws SQLException
+     */
+    public static Expression coerce(Expression lhs, Expression rhs, CompareOp op) throws SQLException {
+        return coerce(lhs, rhs, getWrapper(op));
+    }
+        
+    public static Expression coerce(Expression lhs, Expression rhs, ExpressionComparabilityWrapper wrapper) throws SQLException {
+        
+        if (lhs instanceof RowValueConstructorExpression && rhs instanceof RowValueConstructorExpression) {
+            int i = 0;
+            List<Expression> coercedNodes = Lists.newArrayListWithExpectedSize(Math.max(lhs.getChildren().size(), rhs.getChildren().size()));
+            for (; i < Math.min(lhs.getChildren().size(),rhs.getChildren().size()); i++) {
+                coercedNodes.add(coerce(lhs.getChildren().get(i), rhs.getChildren().get(i), wrapper));
+            }
+            for (; i < lhs.getChildren().size(); i++) {
+                coercedNodes.add(coerce(lhs.getChildren().get(i), null, wrapper));
+            }
+            for (; i < rhs.getChildren().size(); i++) {
+                coercedNodes.add(coerce(null, rhs.getChildren().get(i), wrapper));
+            }
+            trimTrailingNulls(coercedNodes);
+            return coercedNodes.equals(rhs.getChildren()) ? rhs : new RowValueConstructorExpression(coercedNodes, rhs.isStateless());
+        } else if (lhs instanceof RowValueConstructorExpression) {
+            List<Expression> coercedNodes = Lists.newArrayListWithExpectedSize(Math.max(rhs.getChildren().size(), lhs.getChildren().size()));
+            coercedNodes.add(coerce(lhs.getChildren().get(0), rhs, wrapper));
+            for (int i = 1; i < lhs.getChildren().size(); i++) {
+                coercedNodes.add(coerce(lhs.getChildren().get(i), null, wrapper));
+            }
+            trimTrailingNulls(coercedNodes);
+            return coercedNodes.equals(rhs.getChildren()) ? rhs : new RowValueConstructorExpression(coercedNodes, rhs.isStateless());
+        } else if (rhs instanceof RowValueConstructorExpression) {
+            List<Expression> coercedNodes = Lists.newArrayListWithExpectedSize(Math.max(rhs.getChildren().size(), lhs.getChildren().size()));
+            coercedNodes.add(coerce(lhs, rhs.getChildren().get(0), wrapper));
+            for (int i = 1; i < rhs.getChildren().size(); i++) {
+                coercedNodes.add(coerce(null, rhs.getChildren().get(i), wrapper));
+            }
+            trimTrailingNulls(coercedNodes);
+            return coercedNodes.equals(rhs.getChildren()) ? rhs : new RowValueConstructorExpression(coercedNodes, rhs.isStateless());
+        } else if (lhs == null) { 
+            return rhs;
+        } else if (rhs == null) {
+            return LiteralExpression.newConstant(null, lhs.getDataType(), lhs.isDeterministic());
+        } else {
+            if (rhs.getDataType() != null && lhs.getDataType() != null && !rhs.getDataType().isCastableTo(lhs.getDataType())) {
+                throw TypeMismatchException.newException(lhs.getDataType(), rhs.getDataType());
+            }
+            return wrapper.wrap(lhs, rhs);
+        }
+    }
+    
+    private static void trimTrailingNulls(List<Expression> expressions) {
+        for (int i = expressions.size() - 1; i >= 0; i--) {
+            Expression e = expressions.get(i);
+            if (e instanceof LiteralExpression && ((LiteralExpression)e).getValue() == null) {
+                expressions.remove(i);
+            } else {
+                break;
+            }
+        }
+    }
+
+
+    public RowValueConstructorExpression() {
+    }
+    
+    public RowValueConstructorExpression(List<Expression> children, boolean isConstant) {
+        super(children);
+        counter = 0;
+        estimatedByteSize = 0;
+        init(isConstant);
+    }
+
+    public int getEstimatedSize() {
+        return estimatedByteSize;
+    }
+    
+    @Override
+    public boolean isStateless() {
+        return literalExprPtr != null;
+    }
+    
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        init(input.readBoolean());
+    }
+    
+    @Override
+    public void write(DataOutput output) throws IOException {
+        super.write(output);
+        output.writeBoolean(literalExprPtr != null);
+    }
+    
+    private void init(boolean isConstant) {
+        this.ptrs = new ImmutableBytesWritable[children.size()];
+        if(isConstant) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            this.evaluate(null, ptr);
+            literalExprPtr = ptr;
+        }
+    }
+    
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+    
+    @Override
+    public void reset() {
+        counter = 0;
+        estimatedByteSize = 0;
+        Arrays.fill(ptrs, null);
+    }
+    
+    private static int getExpressionByteCount(Expression e) {
+        PDataType childType = e.getDataType();
+        if (childType != null && !childType.isFixedWidth()) {
+            return 1;
+        } else {
+            // Write at least one null byte in the case of the child being null with a childType of null
+            Integer byteSize = e.getByteSize();
+            int bytesToWrite = byteSize == null ? 1 : Math.max(1, byteSize);
+            return bytesToWrite;
+        }
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if(literalExprPtr != null) {
+            // if determined during construction that the row value constructor is just comprised of literal expressions, 
+            // let's just return the ptr we have already computed and be done with evaluation.
+            ptr.set(literalExprPtr.get(), literalExprPtr.getOffset(), literalExprPtr.getLength());
+            return true;
+        }
+        try {
+            int j;
+            int expressionCount = counter;
+            for(j = counter; j < ptrs.length; j++) {
+                final Expression expression = children.get(j);
+                // TODO: handle overflow and underflow
+                if (expression.evaluate(tuple, ptr)) {
+                    if (ptr.getLength() == 0) {
+                        estimatedByteSize += getExpressionByteCount(expression);
+                    } else {
+                        expressionCount = j+1;
+                        ptrs[j] = new ImmutableBytesWritable();
+                        ptrs[j].set(ptr.get(), ptr.getOffset(), ptr.getLength());
+                        estimatedByteSize += ptr.getLength() + (expression.getDataType().isFixedWidth() ? 0 : 1); // 1 extra for the separator byte.
+                    }
+                    counter++;
+                } else if (tuple == null || tuple.isImmutable()) {
+                    estimatedByteSize += getExpressionByteCount(expression);
+                    counter++;
+                } else {
+                    return false;
+                }
+            }
+            
+            if (j == ptrs.length) {
+                if (expressionCount == 0) {
+                    ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+                    return true;
+                }
+                if (expressionCount == 1) {
+                    ptr.set(ptrs[0].get(), ptrs[0].getOffset(), ptrs[0].getLength());
+                    return true;
+                }
+                TrustedByteArrayOutputStream output = new TrustedByteArrayOutputStream(estimatedByteSize);
+                try {
+                    boolean previousCarryOver = false;
+                    for (int i = 0; i< expressionCount; i++) {
+                        Expression child = getChildren().get(i);
+                        PDataType childType = child.getDataType();
+                        ImmutableBytesWritable tempPtr = ptrs[i];
+                        if (tempPtr == null) {
+                            // Since we have a null and have no representation for null,
+                            // we must decrement the value of the current. Otherwise,
+                            // we'd have an ambiguity if this value happened to be the
+                            // min possible value.
+                            previousCarryOver = childType == null || childType.isFixedWidth();
+                            int bytesToWrite = getExpressionByteCount(child);
+                            for (int m = 0; m < bytesToWrite; m++) {
+                                output.write(QueryConstants.SEPARATOR_BYTE);
+                            }
+                        } else {
+                            output.write(tempPtr.get(), tempPtr.getOffset(), tempPtr.getLength());
+                            if (!childType.isFixedWidth()) {
+                                output.write(QueryConstants.SEPARATOR_BYTE);
+                            }
+                            if (previousCarryOver) {
+                                previousCarryOver = !ByteUtil.previousKey(output.getBuffer(), output.size());
+                            }
+                        }
+                    }
+                    int outputSize = output.size();
+                    byte[] outputBytes = output.getBuffer();
+                    for (int k = expressionCount -1 ; 
+                            k >=0 &&  getChildren().get(k).getDataType() != null && !getChildren().get(k).getDataType().isFixedWidth() && outputBytes[outputSize-1] == QueryConstants.SEPARATOR_BYTE ; k--) {
+                        outputSize--;
+                    }
+                    ptr.set(outputBytes, 0, outputSize);
+                    return true;
+                } finally {
+                    output.close();
+                }
+            }  
+            return false;
+        } catch (IOException e) {
+            throw new RuntimeException(e); //Impossible.
+        }
+    }
+    
+    @Override
+    public final String toString() {
+        StringBuilder buf = new StringBuilder("(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(children.get(i) + ", ");
+        }
+        buf.append(children.get(children.size()-1) + ")");
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/StringConcatExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/StringConcatExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/StringConcatExpression.java
new file mode 100644
index 0000000..24e96f7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/StringConcatExpression.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+
+
+/**
+ * 
+ * Implementation for || string concatenation expression.
+ * @author kmahadik
+ * @since 0.1
+ */
+
+public class StringConcatExpression extends BaseCompoundExpression {
+    public StringConcatExpression() {
+    }
+
+    public StringConcatExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder("(");
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(children.get(i) + " || ");
+        }
+        buf.append(children.get(children.size()-1));
+        buf.append(')');
+        return buf.toString();
+    }
+
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        byte[] result = ByteUtil.EMPTY_BYTE_ARRAY;
+        for (int i=0; i<children.size(); i++) {
+            if (children.get(i).getDataType() == null || !children.get(i).evaluate(tuple, ptr)) {
+                continue;
+            }
+            PDataType childType = children.get(i).getDataType();
+            ColumnModifier columnModifier = children.get(i).getColumnModifier();
+            // We could potentially not invert the bytes, but we might as well since we're allocating
+            // additional space here anyway.
+            if (childType.isCoercibleTo(PDataType.VARCHAR)) {
+                result = ByteUtil.concat(result, ByteUtil.concat(columnModifier, ptr));
+            } else {
+                result = ByteUtil.concat(result, PDataType.VARCHAR.toBytes(childType.toObject(ptr, columnModifier).toString()));
+            }
+        }
+        ptr.set(result);
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARCHAR;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/SubtractExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/SubtractExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/SubtractExpression.java
new file mode 100644
index 0000000..a9c9c93
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/SubtractExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.query.QueryConstants;
+
+
+/**
+ * 
+ * Subtract expression implementation
+ *
+ * @author kmahadik
+ * @since 0.1
+ */
+public abstract class SubtractExpression extends BaseAddSubtractExpression {
+    protected static final BigDecimal BD_MILLIS_IN_DAY = BigDecimal.valueOf(QueryConstants.MILLIS_IN_DAY);
+
+    public SubtractExpression() {
+    }
+
+    public SubtractExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public final <T> T accept(ExpressionVisitor<T> visitor) {
+        List<T> l = acceptChildren(visitor, visitor.visitEnter(this));
+        T t = visitor.visitLeave(this, l);
+        if (t == null) {
+            t = visitor.defaultReturn(this, l);
+        }
+        return t;
+    }
+    
+    @Override
+    public String getOperatorString() {
+        return " - ";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampAddExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampAddExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampAddExpression.java
new file mode 100644
index 0000000..1206704
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampAddExpression.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.DateUtil;
+
+/**
+ * 
+ * Class to encapsulate addition arithmetic for {@link PDataType#TIMESTAMP}.
+ *
+ * @author samarth.jain
+ * @since 2.1.3
+ */
+
+public class TimestampAddExpression extends AddExpression {
+
+    public TimestampAddExpression() {
+    }
+
+    public TimestampAddExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        BigDecimal finalResult = BigDecimal.ZERO;
+        
+        for(int i=0; i<children.size(); i++) {
+            if (!children.get(i).evaluate(tuple, ptr)) {
+                return false;
+            }
+            if (ptr.getLength() == 0) {
+                return true;
+            }
+            BigDecimal value;
+            PDataType type = children.get(i).getDataType();
+            ColumnModifier columnModifier = children.get(i).getColumnModifier();
+            if(type == PDataType.TIMESTAMP || type == PDataType.UNSIGNED_TIMESTAMP) {
+                value = (BigDecimal)(PDataType.DECIMAL.toObject(ptr, type, columnModifier));
+            } else if (type.isCoercibleTo(PDataType.DECIMAL)) {
+                value = (((BigDecimal)PDataType.DECIMAL.toObject(ptr, columnModifier)).multiply(QueryConstants.BD_MILLIS_IN_DAY)).setScale(6, RoundingMode.HALF_UP);
+            } else if (type.isCoercibleTo(PDataType.DOUBLE)) {
+                value = ((BigDecimal.valueOf(type.getCodec().decodeDouble(ptr, columnModifier))).multiply(QueryConstants.BD_MILLIS_IN_DAY)).setScale(6, RoundingMode.HALF_UP);
+            } else {
+                value = BigDecimal.valueOf(type.getCodec().decodeLong(ptr, columnModifier));
+            } 
+            finalResult = finalResult.add(value);
+        }
+        Timestamp ts = DateUtil.getTimestamp(finalResult);
+        byte[] resultPtr = new byte[getDataType().getByteSize()];
+        PDataType.TIMESTAMP.toBytes(ts, resultPtr, 0);
+        ptr.set(resultPtr);
+        return true;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.TIMESTAMP;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampSubtractExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampSubtractExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampSubtractExpression.java
new file mode 100644
index 0000000..57450c0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/TimestampSubtractExpression.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.DateUtil;
+/**
+ * 
+ * Class to encapsulate subtraction arithmetic for {@link PDataType#TIMESTAMP}.
+ *
+ * @author samarth.jain
+ * @since 2.1.3
+ */
+public class TimestampSubtractExpression extends SubtractExpression {
+
+    public TimestampSubtractExpression() {
+    }
+
+    public TimestampSubtractExpression(List<Expression> children) {
+        super(children);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        BigDecimal finalResult = BigDecimal.ZERO;
+        
+        for(int i=0; i<children.size(); i++) {
+            if (!children.get(i).evaluate(tuple, ptr)) {
+                return false;
+            }
+            if (ptr.getLength() == 0) {
+                return true;
+            }
+            BigDecimal value;
+            PDataType type = children.get(i).getDataType();
+            ColumnModifier columnModifier = children.get(i).getColumnModifier();
+            if(type == PDataType.TIMESTAMP || type == PDataType.UNSIGNED_TIMESTAMP) {
+                value = (BigDecimal)(PDataType.DECIMAL.toObject(ptr, type, columnModifier));
+            } else if (type.isCoercibleTo(PDataType.DECIMAL)) {
+                value = (((BigDecimal)PDataType.DECIMAL.toObject(ptr, columnModifier)).multiply(BD_MILLIS_IN_DAY)).setScale(6, RoundingMode.HALF_UP);
+            } else if (type.isCoercibleTo(PDataType.DOUBLE)) {
+                value = ((BigDecimal.valueOf(type.getCodec().decodeDouble(ptr, columnModifier))).multiply(BD_MILLIS_IN_DAY)).setScale(6, RoundingMode.HALF_UP);
+            } else {
+                value = BigDecimal.valueOf(type.getCodec().decodeLong(ptr, columnModifier));
+            }
+            if (i == 0) {
+                finalResult = value;
+            } else {
+                finalResult = finalResult.subtract(value);
+            }
+        }
+        Timestamp ts = DateUtil.getTimestamp(finalResult);
+        byte[] resultPtr = new byte[getDataType().getByteSize()];
+        PDataType.TIMESTAMP.toBytes(ts, resultPtr, 0);
+        ptr.set(resultPtr);
+        return true;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.TIMESTAMP;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
new file mode 100644
index 0000000..4067eb4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Interface to abstract the incremental calculation of an aggregated value.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface Aggregator extends Expression {
+    
+    /**
+     * Incrementally aggregate the value with the current row
+     * @param tuple the result containing all the key values of the row
+     * @param ptr the bytes pointer to the underlying result
+     */
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr);
+    
+    /**
+     * Get the size in bytes
+     */
+    public int getSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
new file mode 100644
index 0000000..18dfc4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * 
+ * Represents an ordered list of Aggregators
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+abstract public class Aggregators {
+    protected final int estimatedByteSize;
+    protected final KeyValueSchema schema;
+    protected final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    protected final ValueBitSet valueSet;
+    protected final Aggregator[] aggregators;
+    protected final SingleAggregateFunction[] functions;
+    
+    public int getEstimatedByteSize() {
+        return estimatedByteSize;
+    }
+    
+    public Aggregators(SingleAggregateFunction[] functions, Aggregator[] aggregators, int minNullableIndex) {
+        this.functions = functions;
+        this.aggregators = aggregators;
+        this.estimatedByteSize = calculateSize(aggregators);
+        this.schema = newValueSchema(aggregators, minNullableIndex);
+        this.valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    public KeyValueSchema getValueSchema() {
+        return schema;
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder(this.getClass().getName() + " [" + functions.length + "]:");
+        for (int i = 0; i < functions.length; i++) {
+            SingleAggregateFunction function = functions[i];
+            buf.append("\t" + i + ") " + function );
+        }
+        return buf.toString();
+    }
+    
+    /**
+     * Return the aggregate functions
+     */
+    public SingleAggregateFunction[] getFunctions() {
+        return functions;
+    }
+    
+    /**
+     * Aggregate over aggregators
+     * @param result the single row Result from scan iteration
+     */
+    abstract public void aggregate(Aggregator[] aggregators, Tuple result);
+
+    protected static int calculateSize(Aggregator[] aggregators) {
+        
+        int size = SizedUtil.ARRAY_SIZE /*aggregators[]*/  + (SizedUtil.POINTER_SIZE  * aggregators.length);
+        for (Aggregator aggregator : aggregators) {
+            size += aggregator.getSize();
+        }
+        return size;
+    }
+    
+    /**
+     * Get the ValueSchema for the Aggregators
+     */
+    private static KeyValueSchema newValueSchema(Aggregator[] aggregators, int minNullableIndex) {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(minNullableIndex);
+        for (int i = 0; i < aggregators.length; i++) {
+            Aggregator aggregator = aggregators[i];
+            builder.addField(aggregator);
+        }
+        return builder.build();
+    }
+
+    /**
+     * @return byte representation of the ValueSchema
+     */
+    public byte[] toBytes(Aggregator[] aggregators) {
+        return schema.toBytes(aggregators, valueSet, ptr);
+    }
+    
+    public int getAggregatorCount() {
+        return aggregators.length;
+    }
+
+    public Aggregator[] getAggregators() {
+        return aggregators;
+    }
+    
+    abstract public Aggregator[] newAggregators();
+    
+    public void reset(Aggregator[] aggregators) {
+        for (int i = 0; i < aggregators.length; i++) {
+            aggregators[i].reset();
+        }
+    }
+    
+    protected Aggregator getAggregator(int position) {
+        return aggregators[position];
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
new file mode 100644
index 0000000..8dd6668
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * Base class for Aggregator implementations
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class BaseAggregator extends BaseTerminalExpression implements Aggregator {
+    
+    protected final ColumnModifier columnModifier;    
+    
+    public BaseAggregator(ColumnModifier columnModifier) {
+        this.columnModifier = columnModifier;
+    }
+    
+    @Override
+    public boolean isNullable() {
+        return true;
+    }
+    
+    @Override
+    public int getSize() {
+        return SizedUtil.OBJECT_SIZE;
+    }
+    
+    ImmutableBytesWritable evalClientAggs(Aggregator clientAgg) {
+        CountAggregator ca = (CountAggregator)clientAgg;
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ca.evaluate(null, ptr);
+        return ptr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
new file mode 100644
index 0000000..b28cb92
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.*;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.BigDecimalUtil.Operation;
+
+/**
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class BaseDecimalStddevAggregator extends DistinctValueWithCountClientAggregator {
+
+    private BigDecimal cachedResult = null;
+    private int colPrecision;
+    private int colScale;
+
+    public BaseDecimalStddevAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        ColumnExpression stdDevColExp = (ColumnExpression)exps.get(0);
+        this.colPrecision = stdDevColExp.getMaxLength();
+        this.colScale = stdDevColExp.getScale();
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            BigDecimal ssd = sumSquaredDeviation();
+            ssd = ssd.divide(new BigDecimal(getDataPointsCount()), PDataType.DEFAULT_MATH_CONTEXT);
+            // Calculate the precision for the stddev result.
+            // There are totalCount #Decimal values for which we are calculating the stddev
+            // The resultant precision depends on precision and scale of all these values. (See
+            // BigDecimalUtil.getResultPrecisionScale)
+            // As of now we are not using the actual precision and scale of individual values but just using the table
+            // column's max length(precision) and scale for each of the values.
+            int resultPrecision = colPrecision;
+            for (int i = 1; i < this.totalCount; i++) {
+                // Max precision that we can support is 38 See PDataType.MAX_PRECISION
+                if (resultPrecision >= PDataType.MAX_PRECISION) break;
+                Pair<Integer, Integer> precisionScale = BigDecimalUtil.getResultPrecisionScale(this.colPrecision,
+                        this.colScale, this.colPrecision, this.colScale, Operation.OTHERS);
+                resultPrecision = precisionScale.getFirst();
+            }
+            cachedResult = new BigDecimal(Math.sqrt(ssd.doubleValue()), new MathContext(resultPrecision,
+                    RoundingMode.HALF_UP));
+            cachedResult.setScale(this.colScale, RoundingMode.HALF_UP);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+
+    protected abstract long getDataPointsCount();
+
+    private BigDecimal sumSquaredDeviation() {
+        BigDecimal m = mean();
+        BigDecimal result = BigDecimal.ZERO;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            BigDecimal colValue = (BigDecimal)PDataType.DECIMAL.toObject(entry.getKey());
+            BigDecimal delta = colValue.subtract(m);
+            result = result.add(delta.multiply(delta).multiply(new BigDecimal(entry.getValue())));
+        }
+        return result;
+    }
+
+    private BigDecimal mean() {
+        BigDecimal sum = BigDecimal.ZERO;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            BigDecimal colValue = (BigDecimal)PDataType.DECIMAL.toObject(entry.getKey());
+            sum = sum.add(colValue.multiply(new BigDecimal(entry.getValue())));
+        }
+        return sum.divide(new BigDecimal(totalCount), PDataType.DEFAULT_MATH_CONTEXT);
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
new file mode 100644
index 0000000..eaafc79
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class BaseStddevAggregator extends DistinctValueWithCountClientAggregator {
+
+    protected Expression stdDevColExp;
+    private BigDecimal cachedResult = null;
+
+    public BaseStddevAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(columnModifier);
+        this.stdDevColExp = exps.get(0);
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.DECIMAL.getByteSize();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (cachedResult == null) {
+            double ssd = sumSquaredDeviation();
+            double result = Math.sqrt(ssd / getDataPointsCount());
+            cachedResult = new BigDecimal(result);
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+        buffer = PDataType.DECIMAL.toBytes(cachedResult);
+        ptr.set(buffer);
+        return true;
+    }
+    
+    protected abstract long getDataPointsCount();
+    
+    private double sumSquaredDeviation() {
+        double m = mean();
+        double result = 0.0;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            double colValue = (Double)PDataType.DOUBLE.toObject(entry.getKey(), this.stdDevColExp.getDataType());
+            double delta = colValue - m;
+            result += (delta * delta) * entry.getValue();
+        }
+        return result;
+    }
+
+    private double mean() {
+        double sum = 0.0;
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            double colValue = (Double)PDataType.DOUBLE.toObject(entry.getKey(), this.stdDevColExp.getDataType());
+            sum += colValue * entry.getValue();
+        }
+        return sum / totalCount;
+    }
+    
+    @Override
+    public void reset() {
+        super.reset();
+        this.cachedResult = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
new file mode 100644
index 0000000..0ac5baf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+
+
+/**
+ * 
+ * Aggregators that execute on the client-side
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ClientAggregators extends Aggregators {
+    private final ValueBitSet tempValueSet; 
+  
+    private static Aggregator[] getAggregators(List<SingleAggregateFunction> aggFuncs) {
+        Aggregator[] aggregators = new Aggregator[aggFuncs.size()];
+        for (int i = 0; i < aggregators.length; i++) {
+            aggregators[i] = aggFuncs.get(i).getAggregator();
+        }
+        return aggregators;
+    }
+    
+    public ClientAggregators(List<SingleAggregateFunction> functions, int minNullableIndex) {
+        super(functions.toArray(new SingleAggregateFunction[functions.size()]), getAggregators(functions), minNullableIndex);
+        this.tempValueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    @Override
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
+        TupleUtil.getAggregateValue(result, ptr);
+        tempValueSet.clear();
+        tempValueSet.or(ptr);
+
+        int i = 0, maxOffset = ptr.getOffset() + ptr.getLength();
+        Boolean hasValue;
+        schema.iterator(ptr);
+        while ((hasValue=schema.next(ptr, i, maxOffset, tempValueSet)) != null) {
+            if (hasValue) {
+                aggregators[i].aggregate(result, ptr);
+            }
+            i++;
+        }
+    }
+    
+    @Override
+    public Aggregator[] newAggregators() {
+        Aggregator[] aggregators = new Aggregator[functions.length];
+        for (int i = 0; i < functions.length; i++) {
+            aggregators[i] = functions[i].newClientAggregator();
+        }
+        return aggregators;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
new file mode 100644
index 0000000..33740c2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/CountAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+/**
+ * 
+ * Aggregator for COUNT aggregations
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class CountAggregator extends BaseAggregator {
+
+    private long count = 0;
+    private byte[] buffer = null;
+
+    public CountAggregator() {
+        super(null);
+    }
+
+    public CountAggregator(LongSumAggregator clientAgg) {
+        this();
+        count = clientAgg.getSum();
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        count++;
+    }
+
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            buffer = new byte[getDataType().getByteSize()];
+        }
+        getDataType().getCodec().encodeLong(count, buffer, 0);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.LONG;
+    }
+
+    @Override
+    public void reset() {
+        count = 0;
+        buffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "COUNT [count=" + count + "]";
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.LONG_SIZE + SizedUtil.ARRAY_SIZE
+                + getDataType().getByteSize();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
new file mode 100644
index 0000000..c707057
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevPopAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_POP aggregations for DECIMAL data type.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DecimalStddevPopAggregator extends BaseDecimalStddevAggregator {
+
+    public DecimalStddevPopAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
new file mode 100644
index 0000000..0aa1928
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalStddevSampAggregator.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.List;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnModifier;
+
+/**
+ * Client side Aggregator for STDDEV_SAMP aggregations for DECIMAL data type.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DecimalStddevSampAggregator extends BaseDecimalStddevAggregator {
+
+    public DecimalStddevSampAggregator(List<Expression> exps, ColumnModifier columnModifier) {
+        super(exps, columnModifier);
+    }
+
+    @Override
+    protected long getDataPointsCount() {
+        return totalCount - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
new file mode 100644
index 0000000..5b91ef3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DecimalSumAggregator.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.math.BigDecimal;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ * 
+ * Aggregator that sums BigDecimal values
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class DecimalSumAggregator extends BaseAggregator {
+    private BigDecimal sum = BigDecimal.ZERO;
+    private byte[] sumBuffer;
+    
+    public DecimalSumAggregator(ColumnModifier columnModifier, ImmutableBytesWritable ptr) {
+        super(columnModifier);
+        if (ptr != null) {
+            initBuffer();
+            sum = (BigDecimal)PDataType.DECIMAL.toObject(ptr);
+        }
+    }
+    
+    private PDataType getInputDataType() {
+        return PDataType.DECIMAL;
+    }
+    
+    private int getBufferLength() {
+        return getDataType().getByteSize();
+    }
+
+    private void initBuffer() {
+        sumBuffer = new byte[getBufferLength()];
+    }
+    
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        BigDecimal value = (BigDecimal)getDataType().toObject(ptr, getInputDataType(), columnModifier);
+        sum = sum.add(value);
+        if (sumBuffer == null) {
+            sumBuffer = new byte[getDataType().getByteSize()];
+        }
+    }
+    
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (sumBuffer == null) {
+            return false;
+        }
+        int len = getDataType().toBytes(sum, sumBuffer, 0);
+        ptr.set(sumBuffer, 0, len);
+        return true;
+    }
+    
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.DECIMAL;
+    }
+    
+    @Override
+    public void reset() {
+        sum = BigDecimal.ZERO;
+        sumBuffer = null;
+        super.reset();
+    }
+
+    @Override
+    public String toString() {
+        return "DECIMAL SUM [sum=" + sum + "]";
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.BIG_DECIMAL_SIZE + SizedUtil.ARRAY_SIZE + getDataType().getByteSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
new file mode 100644
index 0000000..d221e91
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctCountClientAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator for DISTINCT COUNT aggregations
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public class DistinctCountClientAggregator extends DistinctValueWithCountClientAggregator {
+
+    public DistinctCountClientAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (buffer == null) {
+            initBuffer();
+        }
+        long value = this.valueVsCount.size();
+        buffer = PDataType.LONG.toBytes(value);
+        ptr.set(buffer);
+        return true;
+    }
+
+    @Override
+    protected int getBufferLength() {
+        return PDataType.LONG.getByteSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
new file mode 100644
index 0000000..c009a28
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Client side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
+ * 
+ * @author anoopsjohn
+ * @since 1.2.1
+ */
+public abstract class DistinctValueWithCountClientAggregator extends BaseAggregator {
+    protected Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+    protected byte[] buffer;
+    protected long totalCount = 0L;
+
+    public DistinctValueWithCountClientAggregator(ColumnModifier columnModifier) {
+        super(columnModifier);
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        InputStream is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1);
+        try {
+            if (Bytes.equals(ptr.get(), ptr.getOffset(), 1, DistinctValueWithCountServerAggregator.COMPRESS_MARKER, 0,
+                    1)) {
+                InputStream decompressionStream = DistinctValueWithCountServerAggregator.COMPRESS_ALGO
+                        .createDecompressionStream(is,
+                                DistinctValueWithCountServerAggregator.COMPRESS_ALGO.getDecompressor(), 0);
+                is = decompressionStream;
+            }
+            DataInputStream in = new DataInputStream(is);
+            int mapSize = WritableUtils.readVInt(in);
+            for (int i = 0; i < mapSize; i++) {
+                int keyLen = WritableUtils.readVInt(in);
+                byte[] keyBytes = new byte[keyLen];
+                in.read(keyBytes, 0, keyLen);
+                ImmutableBytesPtr key = new ImmutableBytesPtr(keyBytes);
+                int value = WritableUtils.readVInt(in);
+                Integer curCount = valueVsCount.get(key);
+                if (curCount == null) {
+                    valueVsCount.put(key, value);
+                } else {
+                    valueVsCount.put(key, curCount + value);
+                }
+                totalCount += value;
+            }
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe); // Impossible as we're using a ByteArrayInputStream
+        }
+        if (buffer == null) {
+            initBuffer();
+        }
+    }
+
+    protected abstract int getBufferLength();
+
+    protected void initBuffer() {
+        buffer = new byte[getBufferLength()];
+    }
+
+    @Override
+    public boolean isNullable() {
+        return false;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+    @Override
+    public void reset() {
+        valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+        buffer = null;
+        totalCount = 0L;
+        super.reset();
+    }
+    
+    protected Map<Object, Integer> getSortedValueVsCount(final boolean ascending, final PDataType type) {
+        // To sort the valueVsCount
+        Comparator<Object> comparator = new Comparator<Object>() {
+            @Override
+            public int compare(Object o1, Object o2) {
+                if (ascending) { 
+                    return type.compareTo(o1, o2); 
+                }
+                return type.compareTo(o2, o1);
+            }
+        };
+        Map<Object, Integer> sorted = new TreeMap<Object, Integer>(comparator);
+        for (Entry<ImmutableBytesPtr, Integer> entry : valueVsCount.entrySet()) {
+            sorted.put(type.toObject(entry.getKey(), columnModifier), entry.getValue());
+        }
+        return sorted;
+    }
+}


Mime
View raw message