phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject [2/3] phoenix git commit: PHOENIX-914 Native HBase timestamp support to optimize date range queries in Phoenix
Date Sat, 03 Oct 2015 21:38:22 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 08529cc..56087c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -17,11 +17,15 @@
  */
 package org.apache.phoenix.compile;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.Lists.newArrayListWithCapacity;
 
+import java.sql.Date;
 import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
@@ -43,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -70,6 +75,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
+import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumn;
@@ -84,7 +90,17 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTime;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PUnsignedDate;
+import org.apache.phoenix.schema.types.PUnsignedLong;
+import org.apache.phoenix.schema.types.PUnsignedTime;
+import org.apache.phoenix.schema.types.PUnsignedTimestamp;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -96,7 +112,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class UpsertCompiler {
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) {
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp) {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -104,22 +120,34 @@ public class UpsertCompiler {
         if (table.getBucketNum() != null) {
             pkValues[0] = new byte[] {0};
         }
+        Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
+        RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
         for (int i = 0; i < values.length; i++) {
             byte[] value = values[i];
             PColumn column = table.getColumns().get(columnIndexes[i]);
             if (SchemaUtil.isPKColumn(column)) {
                 pkValues[pkSlotIndex[i]] = value;
+                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
+                    if (!useServerTimestamp) {
+                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
+                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
+                        if (rowTimestamp < 0) {
+                            throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
+                        }
+                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
+                    } 
+                }
             } else {
                 columnValues.put(column, value);
             }
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
         table.newKey(ptr, pkValues);
-        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
+        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo));
     }
-
+    
     private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
-            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes) throws SQLException {
+            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp) throws SQLException {
         PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -156,7 +184,7 @@ public class UpsertCompiler {
                             table.rowKeyOrderOptimizable());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -176,10 +204,12 @@ public class UpsertCompiler {
         private int[] columnIndexes;
         private int[] pkSlotIndexes;
         private final TableRef tableRef;
+        private final boolean useSeverTimestamp;
 
-        private UpsertingParallelIteratorFactory (PhoenixConnection connection, TableRef tableRef) {
+        private UpsertingParallelIteratorFactory (PhoenixConnection connection, TableRef tableRef, boolean useServerTimestamp) {
             super(connection);
             this.tableRef = tableRef;
+            this.useSeverTimestamp = useServerTimestamp;
         }
 
         @Override
@@ -196,7 +226,7 @@ public class UpsertCompiler {
             StatementContext childContext = new StatementContext(statement, false);
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes);
+            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp);
             return state;
         }
         
@@ -217,6 +247,17 @@ public class UpsertCompiler {
         this.statement = statement;
     }
     
+    private static LiteralParseNode getNodeForRowTimestampColumn(PColumn col) {
+        PDataType type = col.getDataType();
+        long dummyValue = 0L;
+        if (type.isCoercibleTo(PTimestamp.INSTANCE)) {
+            return new LiteralParseNode(new Timestamp(dummyValue), PTimestamp.INSTANCE);
+        } else if (type == PLong.INSTANCE || type == PUnsignedLong.INSTANCE) {
+            return new LiteralParseNode(dummyValue, PLong.INSTANCE);
+        }
+        throw new IllegalArgumentException();
+    }
+    
     public MutationPlan compile(UpsertStatement upsert) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -248,6 +289,7 @@ public class UpsertCompiler {
         // be out of date. We do not retry if auto commit is on, as we
         // update the cache up front when we create the resolver in that case.
         boolean retryOnce = !connection.getAutoCommit();
+        boolean useServerTimestampToBe = false;
         while (true) {
             try {
                 resolver = FromCompiler.getResolverForMutation(upsert, connection);
@@ -366,6 +408,21 @@ public class UpsertCompiler {
                         }
                         i++;
                     }
+                    // If a table has rowtimestamp col, then we always set it.
+                    useServerTimestampToBe = table.getRowTimestampColPos() != -1 && !isRowTimestampSet(pkSlotIndexesToBe, table);
+                    if (useServerTimestampToBe) {
+                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
+                        // Need to resize columnIndexesToBe and pkSlotIndexesToBe to include this extra column.
+                        columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, columnIndexesToBe.length + 1);
+                        pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, pkSlotIndexesToBe.length + 1);
+                        columnIndexesToBe[i] = rowTimestampCol.getPosition();
+                        pkColumnsSet.set(pkSlotIndexesToBe[i] = table.getRowTimestampColPos());
+                        targetColumns.add(rowTimestampCol);
+                        if (valueNodes != null && !valueNodes.isEmpty()) {
+                            valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol));
+                        }
+                        nColumnsToSet++;
+                    }
                     for (i = posOffset; i < table.getPKColumns().size(); i++) {
                         PColumn pkCol = table.getPKColumns().get(i);
                         if (!pkColumnsSet.get(i)) {
@@ -382,7 +439,7 @@ public class UpsertCompiler {
                     select = SubselectRewriter.flatten(select, connection);
                     ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection);
                     select = StatementNormalizer.normalize(select, selectResolver);
-                    select = prependTenantAndViewConstants(table, select, tenantIdStr, addViewColumnsToBe);
+                    select = prependTenantAndViewConstants(table, select, tenantIdStr, addViewColumnsToBe, useServerTimestampToBe);
                     SelectStatement transformedSelect = SubqueryRewriter.transform(select, selectResolver, connection);
                     if (transformedSelect != select) {
                         selectResolver = FromCompiler.getResolverForQuery(transformedSelect, connection);
@@ -406,10 +463,12 @@ public class UpsertCompiler {
                     if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
                         // We can pipeline the upsert select instead of spooling everything to disk first,
                         // if we don't have any post processing that's required.
-                        parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe);
-                        // If we're in the else, then it's not an aggregate, distinct, limted, or sequence using query,
+                        parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe);
+                        // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
                         // so we might be able to run it entirely on the server side.
-                        runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty());
+                        // For a table with row timestamp column, we can't guarantee that the row key will reside in the
+                        // region space managed by region servers. So we bail out on executing on server side.
+                        runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
                     }
                     // If we may be able to run on the server, add a hint that favors using the data table
                     // if all else is equal.
@@ -488,12 +547,15 @@ public class UpsertCompiler {
         final RowProjector projector = projectorToBe;
         final QueryPlan queryPlan = queryPlanToBe;
         final TableRef tableRef = tableRefToBe;
-        final int[] columnIndexes = columnIndexesToBe;
-        final int[] pkSlotIndexes = pkSlotIndexesToBe;
         final Set<PColumn> addViewColumns = addViewColumnsToBe;
         final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
         final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
-        
+        final int[] columnIndexes = columnIndexesToBe;
+        final int[] pkSlotIndexes = pkSlotIndexesToBe;
+        final boolean useServerTimestamp = useServerTimestampToBe;
+        if (table.getRowTimestampColPos() == -1 && useServerTimestamp) {
+            throw new IllegalStateException("For a table without row timestamp column, useServerTimestamp cannot be true");
+        }
         // TODO: break this up into multiple functions
         ////////////////////////////////////////////////////////////////////
         // UPSERT SELECT
@@ -586,6 +648,7 @@ public class UpsertCompiler {
                     final Scan scan = context.getScan();
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable));
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
+                    
                     // Ignore order by - it has no impact
                     final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
                     return new MutationPlan() {
@@ -648,7 +711,6 @@ public class UpsertCompiler {
                     };
                 }
             }
-
             ////////////////////////////////////////////////////////////////////
             // UPSERT SELECT run client-side
             /////////////////////////////////////////////////////////////////////
@@ -673,7 +735,7 @@ public class UpsertCompiler {
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     if (parallelIteratorFactory == null) {
-                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes);
+                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp);
                     }
                     try {
                         parallelIteratorFactory.setRowProjector(projector);
@@ -717,9 +779,8 @@ public class UpsertCompiler {
         ////////////////////////////////////////////////////////////////////
         // UPSERT VALUES
         /////////////////////////////////////////////////////////////////////
-        int nodeIndex = 0;
-        // initialze values with constant byte values first
         final byte[][] values = new byte[nValuesToSet][];
+        int nodeIndex = 0;
         if (isTenantSpecific) {
             PName tenantId = connection.getTenantId();
             values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId);
@@ -727,6 +788,7 @@ public class UpsertCompiler {
         if (isSharedViewIndex) {
             values[nodeIndex++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
         }
+        
         final int nodeIndexOffset = nodeIndex;
         // Allocate array based on size of all columns in table,
         // since some values may not be set (if they're nullable).
@@ -817,7 +879,7 @@ public class UpsertCompiler {
                     }
                 }
                 Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp);
                 return new MutationState(tableRef, mutation, 0, maxSize, connection);
             }
 
@@ -834,6 +896,17 @@ public class UpsertCompiler {
         };
     }
     
+    private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) {
+        checkArgument(table.getRowTimestampColPos() != -1, "Call this method only for tables with row timestamp column");
+        int rowTimestampColPKSlot = table.getRowTimestampColPos();
+        for (int pkSlot : pkSlotIndexes) {
+            if (pkSlot == rowTimestampColPKSlot) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
     private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, List<TableRef> selectRefs) {
         long minTimestamp = Long.MAX_VALUE;
         for (TableRef selectRef : selectRefs) {
@@ -889,8 +962,8 @@ public class UpsertCompiler {
     }
     
 
-    private static SelectStatement prependTenantAndViewConstants(PTable table, SelectStatement select, String tenantId, Set<PColumn> addViewColumns) {
-        if ((!table.isMultiTenant() || tenantId == null) && table.getViewIndexId() == null && addViewColumns.isEmpty()) {
+    private static SelectStatement prependTenantAndViewConstants(PTable table, SelectStatement select, String tenantId, Set<PColumn> addViewColumns, boolean useServerTimestamp) {
+        if ((!table.isMultiTenant() || tenantId == null) && table.getViewIndexId() == null && addViewColumns.isEmpty() && !useServerTimestamp) {
             return select;
         }
         List<AliasedNode> selectNodes = newArrayListWithCapacity(select.getSelect().size() + 1 + addViewColumns.size());
@@ -906,7 +979,10 @@ public class UpsertCompiler {
             Object value = column.getDataType().toObject(byteValue, 0, byteValue.length-1);
             selectNodes.add(new AliasedNode(null, new LiteralParseNode(value)));
         }
-        
+        if (useServerTimestamp) {
+            PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
+            selectNodes.add(new AliasedNode(null, getNodeForRowTimestampColumn(rowTimestampCol)));
+        }
         return SelectStatement.create(select, selectNodes);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 2f607cc..0a71bfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -299,7 +299,7 @@ public class WhereOptimizer {
         // we can still use our skip scan. The ScanRanges.create() call will explode
         // out the keys.
         slotSpan = Arrays.copyOf(slotSpan, cnf.size());
-        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpan, minMaxRange, nBuckets, useSkipScan);
+        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpan, minMaxRange, nBuckets, useSkipScan, table.getRowTimestampColPos());
         context.setScanRanges(scanRanges);
         if (whereClause == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 78f6fb6..8c5cece 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -38,6 +38,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
@@ -295,6 +296,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue VIEW_CONSTANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES);
     private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
     private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
+    private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
     private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
@@ -306,7 +308,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             ARRAY_SIZE_KV,
             VIEW_CONSTANT_KV,
             IS_VIEW_REFERENCED_KV,
-            COLUMN_DEF_KV
+            COLUMN_DEF_KV,
+            IS_ROW_TIMESTAMP_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -321,6 +324,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int VIEW_CONSTANT_INDEX = COLUMN_KV_COLUMNS.indexOf(VIEW_CONSTANT_KV);
     private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
     private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
+    private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
     
     private static final int LINK_TYPE_INDEX = 0;
 
@@ -601,7 +605,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         boolean isViewReferenced = isViewReferencedKv != null && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isViewReferencedKv.getValueArray(), isViewReferencedKv.getValueOffset(), isViewReferencedKv.getValueLength()));
         Cell columnDefKv = colKeyValues[COLUMN_DEF_INDEX];
         String expressionStr = columnDefKv==null ? null : (String)PVarchar.INSTANCE.toObject(columnDefKv.getValueArray(), columnDefKv.getValueOffset(), columnDefKv.getValueLength());
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr);
+        
+        Cell isRowTimestampKV = colKeyValues[IS_ROW_TIMESTAMP_INDEX];
+        boolean isRowTimestamp =
+                isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+                        isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
+                        isRowTimestampKV.getValueLength()));
+        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp);
         columns.add(column);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 013f7a6..c2746b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -73,6 +73,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1 = MIN_TABLE_TIMESTAMP + 5;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0 = MIN_TABLE_TIMESTAMP + 7;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0 = MIN_TABLE_TIMESTAMP + 9;
     
     // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
     // a different code for every type of error.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 5b98b5e..25f8271 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -249,6 +249,16 @@ public final class PTableProtos {
      */
     com.google.protobuf.ByteString
         getExpressionBytes();
+
+    // optional bool isRowTimestamp = 13;
+    /**
+     * <code>optional bool isRowTimestamp = 13;</code>
+     */
+    boolean hasIsRowTimestamp();
+    /**
+     * <code>optional bool isRowTimestamp = 13;</code>
+     */
+    boolean getIsRowTimestamp();
   }
   /**
    * Protobuf type {@code PColumn}
@@ -361,6 +371,11 @@ public final class PTableProtos {
               expression_ = input.readBytes();
               break;
             }
+            case 104: {
+              bitField0_ |= 0x00001000;
+              isRowTimestamp_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -647,6 +662,22 @@ public final class PTableProtos {
       }
     }
 
+    // optional bool isRowTimestamp = 13;
+    public static final int ISROWTIMESTAMP_FIELD_NUMBER = 13;
+    private boolean isRowTimestamp_;
+    /**
+     * <code>optional bool isRowTimestamp = 13;</code>
+     */
+    public boolean hasIsRowTimestamp() {
+      return ((bitField0_ & 0x00001000) == 0x00001000);
+    }
+    /**
+     * <code>optional bool isRowTimestamp = 13;</code>
+     */
+    public boolean getIsRowTimestamp() {
+      return isRowTimestamp_;
+    }
+
     private void initFields() {
       columnNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       familyNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -660,6 +691,7 @@ public final class PTableProtos {
       viewConstant_ = com.google.protobuf.ByteString.EMPTY;
       viewReferenced_ = false;
       expression_ = "";
+      isRowTimestamp_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -729,6 +761,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00000800) == 0x00000800)) {
         output.writeBytes(12, getExpressionBytes());
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        output.writeBool(13, isRowTimestamp_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -786,6 +821,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(12, getExpressionBytes());
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(13, isRowTimestamp_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -869,6 +908,11 @@ public final class PTableProtos {
         result = result && getExpression()
             .equals(other.getExpression());
       }
+      result = result && (hasIsRowTimestamp() == other.hasIsRowTimestamp());
+      if (hasIsRowTimestamp()) {
+        result = result && (getIsRowTimestamp()
+            == other.getIsRowTimestamp());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -930,6 +974,10 @@ public final class PTableProtos {
         hash = (37 * hash) + EXPRESSION_FIELD_NUMBER;
         hash = (53 * hash) + getExpression().hashCode();
       }
+      if (hasIsRowTimestamp()) {
+        hash = (37 * hash) + ISROWTIMESTAMP_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getIsRowTimestamp());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1063,6 +1111,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x00000400);
         expression_ = "";
         bitField0_ = (bitField0_ & ~0x00000800);
+        isRowTimestamp_ = false;
+        bitField0_ = (bitField0_ & ~0x00001000);
         return this;
       }
 
@@ -1139,6 +1189,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00000800;
         }
         result.expression_ = expression_;
+        if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+          to_bitField0_ |= 0x00001000;
+        }
+        result.isRowTimestamp_ = isRowTimestamp_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1195,6 +1249,9 @@ public final class PTableProtos {
           expression_ = other.expression_;
           onChanged();
         }
+        if (other.hasIsRowTimestamp()) {
+          setIsRowTimestamp(other.getIsRowTimestamp());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1729,6 +1786,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional bool isRowTimestamp = 13;
+      private boolean isRowTimestamp_ ;
+      /**
+       * <code>optional bool isRowTimestamp = 13;</code>
+       */
+      public boolean hasIsRowTimestamp() {
+        return ((bitField0_ & 0x00001000) == 0x00001000);
+      }
+      /**
+       * <code>optional bool isRowTimestamp = 13;</code>
+       */
+      public boolean getIsRowTimestamp() {
+        return isRowTimestamp_;
+      }
+      /**
+       * <code>optional bool isRowTimestamp = 13;</code>
+       */
+      public Builder setIsRowTimestamp(boolean value) {
+        bitField0_ |= 0x00001000;
+        isRowTimestamp_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool isRowTimestamp = 13;</code>
+       */
+      public Builder clearIsRowTimestamp() {
+        bitField0_ = (bitField0_ & ~0x00001000);
+        isRowTimestamp_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PColumn)
     }
 
@@ -6639,37 +6729,37 @@ public final class PTableProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\014PTable.proto\032\021PGuidePosts.proto\"\373\001\n\007PC" +
+      "\n\014PTable.proto\032\021PGuidePosts.proto\"\223\002\n\007PC" +
       "olumn\022\027\n\017columnNameBytes\030\001 \002(\014\022\027\n\017family" +
       "NameBytes\030\002 \001(\014\022\020\n\010dataType\030\003 \002(\t\022\021\n\tmax" +
       "Length\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010nullable\030" +
       "\006 \002(\010\022\020\n\010position\030\007 \002(\005\022\021\n\tsortOrder\030\010 \002" +
       "(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " +
       "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" +
-      "n\030\014 \001(\t\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022\016\n\006" +
-      "values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(" +
-      "\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCo",
-      "unt\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuideP" +
-      "osts\"\357\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" +
-      "\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 " +
-      "\002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016" +
-      "sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022" +
-      "\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022" +
-      "\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes\030\n " +
-      "\003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010\022 \n" +
-      "\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022dataT" +
-      "ableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyName",
-      "\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTenan" +
-      "t\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStateme" +
-      "nt\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010tenan" +
-      "tId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tindexT" +
-      "ype\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\nsto" +
-      "reNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(\005\022\036" +
-      "\n\026rowKeyOrderOptimizable\030\032 \001(\010*A\n\nPTable" +
-      "Type\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005" +
-      "INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoenix." +
-      "coprocessor.generatedB\014PTableProtosH\001\210\001\001",
-      "\240\001\001"
+      "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\"\232\001\n\013PTab" +
+      "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023" +
+      "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou",
+      "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGu" +
+      "idePosts\030\006 \001(\0132\014.PGuidePosts\"\357\004\n\006PTable\022" +
+      "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" +
+      "es\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType" +
+      "\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005" +
+      " \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameBytes\030\007" +
+      " \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t \003(\013" +
+      "2\010.PColumn\022\030\n\007indexes\030\n \003(\0132\007.PTable\022\027\n\017" +
+      "isImmutableRows\030\013 \002(\010\022 \n\nguidePosts\030\014 \003(" +
+      "\0132\014.PTableStats\022\032\n\022dataTableNameBytes\030\r ",
+      "\001(\014\022\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndisabl" +
+      "eWAL\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010viewT" +
+      "ype\030\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\rphys" +
+      "icalNames\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n\013vie" +
+      "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016sta" +
+      "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010\022\027\n" +
+      "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" +
+      "imizable\030\032 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000" +
+      "\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020" +
+      "\004B@\n(org.apache.phoenix.coprocessor.gene",
+      "ratedB\014PTableProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6681,7 +6771,7 @@ public final class PTableProtos {
           internal_static_PColumn_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PColumn_descriptor,
-              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", });
+              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", });
           internal_static_PTableStats_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_PTableStats_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 59c8e68..9448443 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -167,7 +167,16 @@ public enum SQLExceptionCode {
       */
      SELECT_COLUMN_NUM_IN_UNIONALL_DIFFS(525, "42902", "SELECT column number differs in a Union All query is not allowed"),
      SELECT_COLUMN_TYPE_IN_UNIONALL_DIFFS(526, "42903", "SELECT column types differ in a Union All query is not allowed"),
-
+     
+     /**
+      * Row timestamp column related errors
+      */
+     ROWTIMESTAMP_ONE_PK_COL_ONLY(527, "42904", "Only one column that is part of the primary key can be declared as a ROW_TIMESTAMP"),
+     ROWTIMESTAMP_PK_COL_ONLY(528, "42905", "Only columns part of the primary key can be declared as a ROW_TIMESTAMP"),
+     ROWTIMESTAMP_CREATE_ONLY(529, "42906", "A column can be added as ROW_TIMESTAMP only in CREATE TABLE"),
+     ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP"),
+     ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views"),
+     INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero"),
      /** 
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 5e58cbd..a639ba2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -201,21 +202,23 @@ public abstract class BaseQueryPlan implements QueryPlan {
             scan.setSmall(true);
         }
         
-        // Set producer on scan so HBase server does round robin processing
-        //setProducer(scan);
-        // Set the time range on the scan so we don't get back rows newer than when the statement was compiled
-        // The time stamp comes from the server at compile time when the meta data
-        // is resolved.
-        // TODO: include time range in explain plan?
         PhoenixConnection connection = context.getConnection();
-        if (context.getScanTimeRange() == null) {
-          Long scn = connection.getSCN();
-          if (scn == null) {
+        // Get the time range of row_timestamp column
+        TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange();
+        // Get the already existing time range on the scan.
+        TimeRange scanTimeRange = scan.getTimeRange();
+        Long scn = connection.getSCN();
+        if (scn == null) {
             scn = context.getCurrentTime();
-          }
-          ScanUtil.setTimeRange(scan, scn);
-        } else {
-            ScanUtil.setTimeRange(scan, context.getScanTimeRange());
+        }
+        try {
+            TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);
+            if (timeRangeToUse == null) {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+            scan.setTimeRange(timeRangeToUse.getMin(), timeRangeToUse.getMax());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
         byte[] tenantIdBytes;
         if( table.isMultiTenant() == true ) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index b4c0775..d0ff7bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
@@ -29,6 +30,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.Immutable;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -49,15 +53,30 @@ import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
 import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -184,18 +203,54 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
+    private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) {
+        RowKeySchema schema = table.getRowKeySchema();
+        int rowTimestampColPos = table.getRowTimestampColPos();
+        Field rowTimestampField = schema.getField(rowTimestampColPos); 
+        byte[] rowTimestampBytes = PLong.INSTANCE.toBytes(rowTimestamp, rowTimestampField.getSortOrder());
+        int oldOffset = ptr.getOffset();
+        int oldLength = ptr.getLength();
+        // Move the pointer to the start byte of the row timestamp pk
+        schema.position(ptr, 0, rowTimestampColPos);
+        byte[] b  = ptr.get();
+        int newOffset = ptr.getOffset();
+        int length = ptr.getLength();
+        for (int i = newOffset; i < newOffset + length; i++) {
+            // modify the underlying bytes array with the bytes of the row timestamp
+            b[i] = rowTimestampBytes[i - newOffset];
+        }
+        // move the pointer back to where it was before.
+        ptr.set(ptr.get(), oldOffset, oldLength);
+        return ptr;
+    }
+    
     private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) {
+        final PTable table = tableRef.getTable();
+        boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
-                (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? 
-                        IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : 
+                (table.isImmutableRows() || includeMutableIndexes) ? 
+                        IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : 
                         Iterators.<PTable>emptyIterator();
         final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
         Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator();
+        long timestampToUse = timestamp;
         while (iterator.hasNext()) {
             Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next();
             ImmutableBytesPtr key = rowEntry.getKey();
-            PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
+            RowMutationState state = rowEntry.getValue();
+            if (tableWithRowTimestampCol) {
+                RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo();
+                if (rowTsColInfo.useServerTimestamp()) {
+                    // regenerate the key with this timestamp.
+                    key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table);
+                } else {
+                    if (rowTsColInfo.getTimestamp() != null) {
+                        timestampToUse = rowTsColInfo.getTimestamp();
+                    }
+                }
+            }
+            PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key);
             List<Mutation> rowMutations, rowMutationsPertainingToIndex;
             if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
                 row.delete();
@@ -226,13 +281,13 @@ public class MutationState implements SQLCloseable {
             public Pair<byte[], List<Mutation>> next() {
                 if (isFirst) {
                     isFirst = false;
-                    return new Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutations);
+                    return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(),mutations);
                 }
                 PTable index = indexes.next();
                 List<Mutation> indexMutations;
                 try {
                     indexMutations =
-                            IndexUtil.generateIndexData(tableRef.getTable(), index, mutationsPertainingToIndex,
+                            IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex,
                                 tempPtr, connection.getKeyValueBuilder(), connection);
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
@@ -538,15 +593,38 @@ public class MutationState implements SQLCloseable {
         return Arrays.copyOf(result, k);
     }
     
+    @Immutable
+    public static class RowTimestampColInfo {
+        private final boolean useServerTimestamp;
+        private final Long rowTimestamp; 
+        
+        public static final RowTimestampColInfo NULL_ROWTIMESTAMP_INFO = new RowTimestampColInfo(false, null);
+
+        public RowTimestampColInfo(boolean autoGenerate, Long value) {
+            this.useServerTimestamp = autoGenerate;
+            this.rowTimestamp = value;
+        }
+        
+        public boolean useServerTimestamp() {
+            return useServerTimestamp;
+        }
+        
+        public Long getTimestamp() {
+            return rowTimestamp;
+        }
+    }
+    
     public static class RowMutationState {
-        private Map<PColumn,byte[]> columnValues;
+        @Nonnull private Map<PColumn,byte[]> columnValues;
         private int[] statementIndexes;
-
-        public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) {
-            Preconditions.checkNotNull(columnValues);
-
+        @Nonnull private final RowTimestampColInfo rowTsColInfo;
+        
+        public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex, RowTimestampColInfo rowTsColInfo) {
+            checkNotNull(columnValues);
+            checkNotNull(rowTsColInfo);
             this.columnValues = columnValues;
             this.statementIndexes = new int[] {statementIndex};
+            this.rowTsColInfo = rowTsColInfo;
         }
 
         Map<PColumn, byte[]> getColumnValues() {
@@ -561,6 +639,12 @@ public class MutationState implements SQLCloseable {
             getColumnValues().putAll(newRow.getColumnValues());
             statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
         }
+        
+        @Nonnull
+        RowTimestampColInfo getRowTimestampColInfo() {
+            return rowTsColInfo;
+        }
+       
     }
     
     public ReadMetricQueue getReadMetricQueue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index c9b3e79..67e2931 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -120,6 +121,12 @@ public abstract class ExplainTable {
             appendKeyRanges(buf);
         }
         planSteps.add(buf.toString());
+        System.out.println("Table row timestamp column position: " + tableRef.getTable().getRowTimestampColPos());
+        System.out.println("Table name:  " + tableRef.getTable().getName().getString());
+        if (context.getScan() != null && tableRef.getTable().getRowTimestampColPos() != -1) {
+            TimeRange range = context.getScan().getTimeRange();
+            planSteps.add("    ROW TIMESTAMP FILTER [" + range.getMin() + ", " + range.getMax() + ")");
+        }
         
         Iterator<Filter> filterIterator = ScanUtil.getFilterIterator(scan);
         if (filterIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 15608de..35707c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -201,7 +201,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
                 }
             };
         }
-        this.scn = JDBCUtil.getCurrentSCN(url, this.info);
+        
+        Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
+        checkScn(scnParam);
+        this.scn = scnParam;
         this.isAutoCommit = JDBCUtil.getAutoCommit(
                 url, this.info,
                 this.services.getProps().getBoolean(
@@ -254,6 +257,12 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
     }
     
+    private static void checkScn(Long scnParam) throws SQLException {
+        if (scnParam != null && scnParam < 0) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_SCN).build().buildException();
+        }
+    }
+
     private static Properties filterKnownNonProperties(Properties info) {
         Properties prunedProperties = info;
         if (info.contains(PhoenixRuntime.CURRENT_SCN_ATTRIB)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index d27a349..3f6958c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -211,6 +211,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final byte[] VIEW_INDEX_ID_BYTES = Bytes.toBytes(VIEW_INDEX_ID);
     public static final String BASE_COLUMN_COUNT = "BASE_COLUMN_COUNT";
     public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
+    public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
+    public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
 
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 13de06f..9c6a94b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -56,7 +56,6 @@ import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
 import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
 import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.SQLCloseable;
 
 /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
index cde3e9c..ebee43b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
@@ -51,9 +51,10 @@ public class ColumnDef {
     private final boolean isArray;
     private final Integer arrSize;
     private final String expressionStr;
+    private final boolean isRowTimestamp;
  
     ColumnDef(ColumnName columnDefName, String sqlTypeName, boolean isArray, Integer arrSize, Boolean isNull, Integer maxLength,
-    		            Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr) {
+    		            Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) {
    	 try {
          Preconditions.checkNotNull(sortOrder);
    	     PDataType localType = null;
@@ -135,13 +136,14 @@ public class ColumnDef {
              this.dataType = localType;
          }
          this.expressionStr = expressionStr;
+         this.isRowTimestamp = isRowTimestamp;
      } catch (SQLException e) {
          throw new ParseException(e);
      }
     }
     ColumnDef(ColumnName columnDefName, String sqlTypeName, Boolean isNull, Integer maxLength,
-            Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr) {
-    	this(columnDefName, sqlTypeName, false, 0, isNull, maxLength, scale, isPK, sortOrder, expressionStr);
+            Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) {
+    	this(columnDefName, sqlTypeName, false, 0, isNull, maxLength, scale, isPK, sortOrder, expressionStr, isRowTimestamp);
     }
 
     public ColumnName getColumnDefName() {
@@ -190,6 +192,9 @@ public class ColumnDef {
 		return expressionStr;
 	}
 	
+	public boolean isRowTimestamp() {
+	    return isRowTimestamp;
+	}
 	@Override
     public String toString() {
 	    StringBuilder buf = new StringBuilder(columnDefName.getColumnNode().toString());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDefInPkConstraint.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDefInPkConstraint.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDefInPkConstraint.java
new file mode 100644
index 0000000..41d8868
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDefInPkConstraint.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.schema.SortOrder;
+
+public class ColumnDefInPkConstraint {
+    private final ColumnName columnDefName;
+    private final SortOrder sortOrder;
+    private final boolean isRowTimestamp;
+    
+    public ColumnDefInPkConstraint(ColumnName columnDefName, SortOrder sortOrder, boolean isRowTimestamp) {
+        this.columnDefName = columnDefName;
+        this.sortOrder = sortOrder;
+        this.isRowTimestamp = isRowTimestamp;
+    }
+
+    public ColumnName getColumnName() {
+        return columnDefName;
+    }
+
+    public SortOrder getSortOrder() {
+        return sortOrder;
+    }
+
+    public boolean isRowTimestamp() {
+        return isRowTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index ae2f4c6..b4f612c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -29,9 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.exception.UnknownFunctionException;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.function.AvgAggregateFunction;
@@ -265,17 +262,21 @@ public class ParseNodeFactory {
         return new PropertyName(familyName, propertyName);
     }
 
-    public ColumnDef columnDef(ColumnName columnDefName, String sqlTypeName, boolean isNull, Integer maxLength, Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr) {
-        return new ColumnDef(columnDefName, sqlTypeName, isNull, maxLength, scale, isPK, sortOrder, expressionStr);
+    public ColumnDef columnDef(ColumnName columnDefName, String sqlTypeName, boolean isNull, Integer maxLength, Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) {
+        return new ColumnDef(columnDefName, sqlTypeName, isNull, maxLength, scale, isPK, sortOrder, expressionStr, isRowTimestamp);
     }
 
     public ColumnDef columnDef(ColumnName columnDefName, String sqlTypeName, boolean isArray, Integer arrSize, Boolean isNull, Integer maxLength, Integer scale, boolean isPK, 
-        	SortOrder sortOrder) {
-        return new ColumnDef(columnDefName, sqlTypeName, isArray, arrSize, isNull, maxLength, scale, isPK, sortOrder, null);
+        	SortOrder sortOrder, boolean isRowTimestamp) {
+        return new ColumnDef(columnDefName, sqlTypeName, isArray, arrSize, isNull, maxLength, scale, isPK, sortOrder, null, isRowTimestamp);
     }
-
-    public PrimaryKeyConstraint primaryKey(String name, List<Pair<ColumnName, SortOrder>> columnNameAndSortOrder) {
-        return new PrimaryKeyConstraint(name, columnNameAndSortOrder);
+    
+    public ColumnDefInPkConstraint columnDefInPkConstraint(ColumnName columnDefName, SortOrder sortOrder, boolean isRowTimestamp) {
+        return new ColumnDefInPkConstraint(columnDefName, sortOrder, isRowTimestamp);
+    }
+    
+    public PrimaryKeyConstraint primaryKey(String name, List<ColumnDefInPkConstraint> columnDefs) {
+        return new PrimaryKeyConstraint(name, columnDefs);
     }
     
     public IndexKeyConstraint indexKey( List<Pair<ParseNode, SortOrder>> parseNodeAndSortOrder) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/parse/PrimaryKeyConstraint.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/PrimaryKeyConstraint.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/PrimaryKeyConstraint.java
index ae26f62..3128af8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/PrimaryKeyConstraint.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/PrimaryKeyConstraint.java
@@ -17,28 +17,48 @@
  */
 package org.apache.phoenix.parse;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.SortOrder;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
-import org.apache.phoenix.schema.SortOrder;
 
 public class PrimaryKeyConstraint extends NamedNode {
-    public static final PrimaryKeyConstraint EMPTY = new PrimaryKeyConstraint(null, Collections.<Pair<ColumnName, SortOrder>>emptyList());
+    public static final PrimaryKeyConstraint EMPTY = new PrimaryKeyConstraint(null, Collections.<ColumnDefInPkConstraint>emptyList());
 
     private final List<Pair<ColumnName, SortOrder>> columns;
-    private final HashMap<ColumnName, Pair<ColumnName, SortOrder>> columnNameToSortOrder;
+    private final Map<ColumnName, Pair<ColumnName, SortOrder>> columnNameToSortOrder;
+    private final Map<ColumnName, Pair<ColumnName, Boolean>> columnNameToRowTimestamp;
+    private final int numColumnsWithRowTimestamp;
     
-    PrimaryKeyConstraint(String name, List<Pair<ColumnName, SortOrder>> columns) {
+    PrimaryKeyConstraint(String name, List<ColumnDefInPkConstraint> columnDefs) {
         super(name);
-        this.columns = columns == null ? Collections.<Pair<ColumnName, SortOrder>>emptyList() : ImmutableList.copyOf(columns);
-        this.columnNameToSortOrder = Maps.newHashMapWithExpectedSize(this.columns.size());
-        for (Pair<ColumnName, SortOrder> p : this.columns) {
-            this.columnNameToSortOrder.put(p.getFirst(), p);
+        if (columnDefs == null) {
+            this.columns = Collections.<Pair<ColumnName, SortOrder>>emptyList();
+            this.columnNameToSortOrder = Collections.<ColumnName, Pair<ColumnName, SortOrder>>emptyMap();
+            this.columnNameToRowTimestamp = Collections.<ColumnName, Pair<ColumnName, Boolean>>emptyMap();
+            numColumnsWithRowTimestamp = 0;
+        } else {
+            int numRowTimestampCols = 0;
+            List<Pair<ColumnName, SortOrder>> l = new ArrayList<>(columnDefs.size());
+            this.columnNameToSortOrder = Maps.newHashMapWithExpectedSize(columnDefs.size());
+            this.columnNameToRowTimestamp = Maps.newHashMapWithExpectedSize(columnDefs.size());
+            for (ColumnDefInPkConstraint colDef : columnDefs) {
+                Pair<ColumnName, SortOrder> p = Pair.newPair(colDef.getColumnName(), colDef.getSortOrder());
+                l.add(p);
+                this.columnNameToSortOrder.put(colDef.getColumnName(), p);
+                this.columnNameToRowTimestamp.put(colDef.getColumnName(), Pair.newPair(colDef.getColumnName(), colDef.isRowTimestamp()));
+                if (colDef.isRowTimestamp()) {
+                    numRowTimestampCols++;
+                }
+            }
+            this.numColumnsWithRowTimestamp = numRowTimestampCols;
+            this.columns = ImmutableList.copyOf(l); 
         }
     }
 
@@ -46,14 +66,22 @@ public class PrimaryKeyConstraint extends NamedNode {
         return columns;
     }
     
-    public Pair<ColumnName, SortOrder> getColumn(ColumnName columnName) {
+    public Pair<ColumnName, SortOrder> getColumnWithSortOrder(ColumnName columnName) {
     	return columnNameToSortOrder.get(columnName);
     }
     
+    public boolean isColumnRowTimestamp(ColumnName columnName) {
+        return columnNameToRowTimestamp.get(columnName) != null && columnNameToRowTimestamp.get(columnName).getSecond() == Boolean.TRUE;
+    }
+    
     public boolean contains(ColumnName columnName) {
         return columnNameToSortOrder.containsKey(columnName);
     }
     
+    public int getNumColumnsWithRowTimestamp() {
+        return numColumnsWithRowTimestamp;
+    }
+    
     @Override
     public int hashCode() {
         return super.hashCode();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4cd1ac5..af51200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1934,9 +1934,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
                                 
                                 String columnsToAdd = "";
+                                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
+                                    columnsToAdd += PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName();
+                                }
                                 if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
                                     // We know that we always need to add the STORE_NULLS column for 4.3 release
-                                    columnsToAdd = PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
+                                    columnsToAdd = ", " + PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName();
                                     HBaseAdmin admin = null;
                                     try {
                                         admin = getAdmin();
@@ -1969,7 +1972,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
                                             + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName();
                                 }
-
+                                
                                 // If we have some new columns from 4.1-4.3 to add, add them now.
                                 if (!columnsToAdd.isEmpty()) {
                                     // Ugh..need to assign to another local variable to keep eclipse happy.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 92479b3..268bfc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -52,6 +52,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
@@ -249,6 +250,8 @@ public interface QueryConstants {
             INDEX_DISABLE_TIMESTAMP + " BIGINT," +
             STORE_NULLS + " BOOLEAN," +
             BASE_COLUMN_COUNT + " INTEGER," +
+            // Column metadata (will be null for table row)
+            IS_ROW_TIMESTAMP + " BOOLEAN, " + 
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index b4671db..2de8a5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -75,4 +75,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
     public String getExpressionStr() {
         return getDelegate().getExpressionStr();
     }
+
+    @Override
+    public boolean isRowTimestamp() {
+        return getDelegate().isRowTimestamp();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 0251da1..203c5e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -246,4 +246,9 @@ public class DelegateTable implements PTable {
     public boolean rowKeyOrderOptimizable() {
         return delegate.rowKeyOrderOptimizable();
     }
+
+    @Override
+    public int getRowTimestampColPos() {
+        return delegate.getRowTimestampColPos();
+    }
 }


Mime
View raw message