phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-1812 Only sync table metadata when necessary
Date Mon, 31 Aug 2015 17:28:56 GMT
Repository: phoenix
Updated Branches:
  refs/heads/txn 81e52e85b -> 92ee51a0d


http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 1608548..f2423ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -145,6 +145,7 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -164,6 +165,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,7 +297,7 @@ public class MetaDataClient {
         MetaDataMutationResult result = updateCache(schemaName, tableName, true);
         return result.getMutationTime();
     }
-
+    
     /**
      * Update the cache with the latest as of the connection scn.
      * @param schemaName
@@ -314,30 +316,50 @@ public class MetaDataClient {
     public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException {
         return updateCache(tenantId, schemaName, tableName, false);
     }
-
-    private long getClientTimeStamp() {
-        Long scn = connection.getSCN();
-        long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
-        return clientTimeStamp;
+    
+    public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
+        return updateCache(tenantId, schemaName, tableName, alwaysHitServer, null);
     }
-
+    
+    private long getCurrentScn() {
+    	Long scn = connection.getSCN();
+        long currentScn = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+        return currentScn;
+    }
+    
     private MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName,
-            boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez
-        long clientTimeStamp = getClientTimeStamp();
+            boolean alwaysHitServer, Long resolvedTimestamp) throws SQLException { // TODO: pass byte[] herez
         boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName);
         // System tables must always have a null tenantId
         tenantId = systemTable ? null : tenantId;
         PTable table = null;
+        PTableRef tableRef = null;
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         long tableTimestamp = HConstants.LATEST_TIMESTAMP;
+        long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
         try {
-            table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, fullTableName));
+            tableRef = connection.getTableRef(new PTableKey(tenantId, fullTableName));
+            table = tableRef.getTable();
             tableTimestamp = table.getTimeStamp();
+            tableResolvedTimestamp = tableRef.getResolvedTimeStamp();
         } catch (TableNotFoundException e) {
         }
-        // Don't bother with server call: we can't possibly find a newer table
-        if (table != null && !alwaysHitServer && (systemTable || tableTimestamp == clientTimeStamp - 1)) {
-            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,table);
+        
+        boolean defaultTransactional = connection.getQueryServices().getProps().getBoolean(
+							                QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
+							                QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+        // start a txn if all table are transactional by default or if we found the table in the cache and it is transactional
+        // TODO if system tables become transactional remove the check 
+        boolean isTransactional = defaultTransactional || (table!=null && table.isTransactional());
+        if (!systemTable && isTransactional && connection.getMutationState().getTransaction()==null)
+        	connection.getMutationState().startTransaction();
+        resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
+        // Do not make rpc to getTable if 
+        // 1. table is a system table
+        // 2. table was already resolved as of that timestamp
+		if (table != null && !alwaysHitServer
+				&& (systemTable || resolvedTimestamp == tableResolvedTimestamp)) {
+            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table);
         }
 
         int maxTryCount = tenantId == null ? 1 : 2;
@@ -347,7 +369,12 @@ public class MetaDataClient {
         do {
             final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
             final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
-            result = connection.getQueryServices().getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, clientTimeStamp);
+            ConnectionQueryServices queryServices = connection.getQueryServices();
+			result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, resolvedTimestamp);
+			// if the table was assumed to be transactional, but is actually not transactional then re-resolve as of the right timestamp (and vice versa) 
+			if (table==null && result.getTable()!=null && result.getTable().isTransactional()!=isTransactional) {
+				result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, TransactionUtil.getResolvedTimestamp(connection, result.getTable().isTransactional(), HConstants.LATEST_TIMESTAMP));
+			}
 
             if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
                 return result;
@@ -372,14 +399,19 @@ public class MetaDataClient {
                 // server again.
                 if (table != null) {
                     // Ensures that table in result is set to table found in our cache.
-                    result.setTable(table);
                     if (code == MutationCode.TABLE_ALREADY_EXISTS) {
-                        // Although this table is up-to-date, the parent table may not be.
-                        // In this case, we update the parent table which may in turn pull
-                        // in indexes to add to this table.
-                        if (addIndexesFromPhysicalTable(result)) {
-                            connection.addTable(result.getTable());
-                        }
+                    	result.setTable(table);
+                    	// Although this table is up-to-date, the parent table may not be.
+                    	// In this case, we update the parent table which may in turn pull
+                    	// in indexes to add to this table.
+                    	long resolvedTime = TransactionUtil.getResolvedTime(connection, result);
+						if (addIndexesFromPhysicalTable(result, resolvedTimestamp)) {
+                    	    connection.addTable(result.getTable(), resolvedTime);
+                    	}
+                    	else {
+                        	// if we aren't adding the table, we still need to update the resolved time of the table 
+                        	connection.updateResolvedTimestamp(table, resolvedTime);
+                    	}
                         return result;
                     }
                     // If table was not found at the current time stamp and we have one cached, remove it.
@@ -400,10 +432,11 @@ public class MetaDataClient {
      * of the table for which we just updated.
      * TODO: combine this round trip with the one that updates the cache for the child table.
      * @param result the result from updating the cache for the current table.
+     * @param resolvedTimestamp timestamp at which child table was resolved
      * @return true if the PTable contained by result was modified and false otherwise
      * @throws SQLException if the physical table cannot be found
      */
-    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) throws SQLException {
+    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result, Long resolvedTimestamp) throws SQLException {
         PTable table = result.getTable();
         // If not a view or if a view directly over an HBase table, there's nothing to do
         if (table.getType() != PTableType.VIEW || table.getViewType() == ViewType.MAPPED) {
@@ -412,7 +445,7 @@ public class MetaDataClient {
         String physicalName = table.getPhysicalName().getString();
         String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
         String tableName = SchemaUtil.getTableNameFromFullName(physicalName);
-        MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false);
+        MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false, resolvedTimestamp);
         PTable physicalTable = parentResult.getTable();
         if (physicalTable == null) {
             throw new TableNotFoundException(schemaName, tableName);
@@ -1085,19 +1118,19 @@ public class MetaDataClient {
                 // as there's no need to burn another sequence value.
                 if (allocateIndexId && indexId == null) {
                     Long scn = connection.getSCN();
-                    long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                     PName tenantId = connection.getTenantId();
                     String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString();
                     PName physicalName = dataTable.getPhysicalName();
                     int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
                     SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets);
-                    // Create at parent timestamp as we know that will be earlier than now
-                    // and earlier than any SCN if one is set.
+                    // if scn is set create at scn-1, so we can see the sequence or else use latest timestamp (so that latest server time is used)
+                    long sequenceTimestamp = scn!=null ? scn-1 : HConstants.LATEST_TIMESTAMP;
                     createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
                         true, Short.MIN_VALUE, 1, 1, false, Long.MIN_VALUE, Long.MAX_VALUE,
-                        dataTable.getTimeStamp());
+                        sequenceTimestamp);
                     long[] seqValues = new long[1];
                     SQLException[] sqlExceptions = new SQLException[1];
+                    long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                     connection.getQueryServices().incrementSequences(Collections.singletonList(key), timestamp, seqValues, sqlExceptions);
                     if (sqlExceptions[0] != null) {
                         throw sqlExceptions[0];
@@ -1212,8 +1245,10 @@ public class MetaDataClient {
             boolean isImmutableRows = false;
             List<PName> physicalNames = Collections.emptyList();
             boolean addSaltColumn = false;
+            Long timestamp = null;
             if (parent != null) {
                 transactional = parent.isTransactional();
+                timestamp = TransactionUtil.getTableTimestamp(connection, transactional, null);
                 storeNulls = parent.getStoreNulls();
                 if (tableType == PTableType.INDEX) {
                     // Index on view
@@ -1246,7 +1281,7 @@ public class MetaDataClient {
                     incrementStatement.execute();
                     // Get list of mutations and add to table meta data that will be passed to server
                     // to guarantee order. This row will always end up last
-                    tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                     connection.rollback();
     
                     // Add row linking from data table row to index table row
@@ -1366,6 +1401,7 @@ public class MetaDataClient {
             		.build().buildException();
             	}
             }
+            timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional, null) : timestamp;
 
             // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views
             if (statement.getTableType() == PTableType.VIEW || indexId != null) {
@@ -1600,7 +1636,7 @@ public class MetaDataClient {
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
                         Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, false);
-                connection.addTable(table);
+                connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
                     int nIndexRowKeyColumns = isPK ? 1 : pkColumnsNames.size();
@@ -1655,7 +1691,7 @@ public class MetaDataClient {
                 addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName, pkName, keySeq, saltBucketNum != null);
             }
 
-            tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
 
             String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
@@ -1699,7 +1735,7 @@ public class MetaDataClient {
             tableUpsert.setBoolean(20, transactional);
             tableUpsert.execute();
 
-            tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
 
             /*
@@ -1747,7 +1783,7 @@ public class MetaDataClient {
             default:
                 PName newSchemaName = PNameFactory.newName(schemaName);
                 PTable table =  PTableImpl.makePTable(
-                        tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(),
+                        tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? timestamp : result.getMutationTime(),
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
@@ -1862,6 +1898,8 @@ public class MetaDataClient {
 
             MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
             MutationCode code = result.getMutationCode();
+            PTable table = result.getTable();
+			boolean transactional = table!=null && table.isTransactional();
             switch (code) {
             case TABLE_NOT_FOUND:
                 if (!ifExists) { throw new TableNotFoundException(schemaName, tableName); }
@@ -1873,12 +1911,11 @@ public class MetaDataClient {
 
                 .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             default:
-                connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
-                        result.getMutationTime());
+				connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
+                        TransactionUtil.getTableTimestamp(connection, transactional, result.getMutationTime()));
 
                 if (result.getTable() != null && tableType != PTableType.VIEW) {
                     connection.setAutoCommit(true);
-                    PTable table = result.getTable();
                     boolean dropMetaData = result.getTable().getViewIndexId() == null &&
                             connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
                     long ts = (scn == null ? result.getMutationTime() : scn);
@@ -2094,7 +2131,9 @@ public class MetaDataClient {
 
             ListMultimap<String,Pair<String,Object>> stmtProperties = statement.getProps();
             Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size());
-            PTable table = FromCompiler.getResolver(statement, connection).getTables().get(0).getTable();
+            TableRef tableRef = FromCompiler.getResolver(statement, connection).getTables().get(0);
+			PTable table = tableRef.getTable();
+            Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
             List<ColumnDef> columnDefs = statement.getColumnDefs();
             if (columnDefs == null) {
                 columnDefs = Collections.emptyList();
@@ -2236,7 +2275,7 @@ public class MetaDataClient {
                         }
                     }
 
-                    tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 } else {
                     // Check that HBase configured properly for mutable secondary indexing
@@ -2260,13 +2299,13 @@ public class MetaDataClient {
                     for (PTable index : table.getIndexes()) {
                         incrementTableSeqNum(index, index.getType(), 1);
                     }
-                    tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
                 long seqNum = table.getSequenceNumber();
                 if (changingPhoenixTableProperty || columnDefs.size() > 0) { 
                     seqNum = incrementTableSeqNum(table, statement.getTableType(), 1, isImmutableRows, disableWAL, multiTenant, storeNulls);
-                    tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
                 
@@ -2305,7 +2344,17 @@ public class MetaDataClient {
                     // Only update client side cache if we aren't adding a PK column to a table with indexes.
                     // We could update the cache manually then too, it'd just be a pain.
                     if (numPkColumnsAdded==0 || table.getIndexes().isEmpty()) {
-                        connection.addColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName), columns, result.getMutationTime(), seqNum, isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, disableWAL == null ? table.isWALDisabled() : disableWAL, multiTenant == null ? table.isMultiTenant() : multiTenant, storeNulls == null ? table.getStoreNulls() : storeNulls);
+						connection.addColumn(
+								tenantId,
+								SchemaUtil.getTableName(schemaName, tableName),
+								columns,
+								result.getMutationTime(),
+								seqNum,
+								isImmutableRows == null ? table.isImmutableRows() : isImmutableRows,
+								disableWAL == null ? table.isWALDisabled() : disableWAL,
+								multiTenant == null ? table.isMultiTenant() : multiTenant,
+								storeNulls == null ? table.getStoreNulls() : storeNulls, 
+								TransactionUtil.getResolvedTime(connection, result));
                     }
                     // Delete rows in view index if we haven't dropped it already
                     // We only need to do this if the multiTenant transitioned to false
@@ -2436,12 +2485,12 @@ public class MetaDataClient {
             boolean retried = false;
             while (true) {
                 final ColumnResolver resolver = FromCompiler.getResolver(statement, connection);
-                PTable table = resolver.getTables().get(0).getTable();
+                TableRef tableRef = resolver.getTables().get(0);
+				PTable table = tableRef.getTable();
                 List<ColumnName> columnRefs = statement.getColumnRefs();
                 if(columnRefs == null) {
                     columnRefs = Lists.newArrayListWithCapacity(0);
                 }
-                TableRef tableRef = null;
                 List<ColumnRef> columnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size() + table.getIndexes().size());
                 List<TableRef> indexesToDrop = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
                 List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize((table.getIndexes().size() + 1) * (1 + table.getColumns().size() - columnRefs.size()));
@@ -2457,30 +2506,31 @@ public class MetaDataClient {
                         }
                         throw e;
                     }
-                    tableRef = columnRef.getTableRef();
                     PColumn columnToDrop = columnRef.getColumn();
                     tableColumnsToDrop.add(columnToDrop);
                     if (SchemaUtil.isPKColumn(columnToDrop)) {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_DROP_PK)
                             .setColumnName(columnToDrop.getName().getString()).build().buildException();
                     }
-                    columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition()));
+                    columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), columnToDrop.getPosition()));
                 }
 
                 dropColumnMutations(table, tableColumnsToDrop, tableMetaData);
                 for (PTable index : table.getIndexes()) {
+                	IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection);
+                	// get the columns required for the index pk
+                	Set<ColumnReference> indexColumns = indexMaintainer.getIndexedColumns();
+                	// get the covered columns 
+                	Set<ColumnReference> coveredColumns = indexMaintainer.getCoverededColumns();
                     List<PColumn> indexColumnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size());
                     for(PColumn columnToDrop : tableColumnsToDrop) {
-                        String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop);
-                        try {
-                            PColumn indexColumn = index.getColumn(indexColumnName);
-                            if (SchemaUtil.isPKColumn(indexColumn)) {
-                                indexesToDrop.add(new TableRef(index));
-                            } else {
-                                indexColumnsToDrop.add(indexColumn);
-                                columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition()));
-                            }
-                        } catch (ColumnNotFoundException e) {
+                    	ColumnReference columnToDropRef = new ColumnReference(columnToDrop.getFamilyName().getBytes(), columnToDrop.getName().getBytes());
+                    	if (indexColumns.contains(columnToDropRef)) {
+                    		indexesToDrop.add(new TableRef(index));
+                    	} 
+                    	else if (coveredColumns.contains(columnToDropRef)) {
+                    		String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop);
+                    		indexColumnsToDrop.add(index.getColumn(indexColumnName));
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {
@@ -2489,11 +2539,12 @@ public class MetaDataClient {
                     }
 
                 }
-                tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null;
+                tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
 
                 long seqNum = incrementTableSeqNum(table, statement.getTableType(), -1);
-                tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
                 // Force table header to be first in list
                 Collections.reverse(tableMetaData);
@@ -2544,7 +2595,7 @@ public class MetaDataClient {
                     // client-side cache as it would be too painful. Just let it pull it over from
                     // the server when needed.
                     if (tableColumnsToDrop.size() > 0 && indexesToDrop.isEmpty()) {
-                        connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum);
+                        connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result));
                     }
                     // If we have a VIEW, then only delete the metadata, and leave the table data alone
                     if (table.getType() != PTableType.VIEW) {
@@ -2586,7 +2637,7 @@ public class MetaDataClient {
                     if (retried) {
                         throw e;
                     }
-                    table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, fullTableName));
+                    table = connection.getTable(new PTableKey(tenantId, fullTableName));
                     retried = true;
                 }
             }
@@ -2629,7 +2680,8 @@ public class MetaDataClient {
                     tableUpsert.close();
                 }
             }
-            List<Mutation> tableMetadata = connection.getMutationState().toMutations().next().getSecond();
+            Long timeStamp = indexRef.getTable().isTransactional() ? indexRef.getTimeStamp() : null;
+			List<Mutation> tableMetadata = connection.getMutationState().toMutations(timeStamp).next().getSecond();
             connection.rollback();
 
             MetaDataMutationResult result = connection.getQueryServices().updateIndexState(tableMetadata, dataTableName);
@@ -2675,9 +2727,9 @@ public class MetaDataClient {
     }
 
     private PTable addTableToCache(MetaDataMutationResult result) throws SQLException {
-        addIndexesFromPhysicalTable(result);
+        addIndexesFromPhysicalTable(result, null);
         PTable table = result.getTable();
-        connection.addTable(table);
+        connection.addTable(table, TransactionUtil.getResolvedTime(connection, result));
         return table;
     }
     
@@ -2700,13 +2752,14 @@ public class MetaDataClient {
          */
         boolean isSharedIndex = table.getViewIndexId() != null;
         if (isSharedIndex) {
-            return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getClientTimeStamp());
+        	// we are assuming the stats table is not transactional
+            return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getCurrentScn());
         }
         boolean isView = table.getType() == PTableType.VIEW;
         String physicalName = table.getPhysicalName().getString();
         if (isView && table.getViewType() != ViewType.MAPPED) {
             try {
-                return connection.getMetaDataCache().getTable(new PTableKey(null, physicalName)).getTableStats();
+                return connection.getTable(new PTableKey(null, physicalName)).getTableStats();
             } catch (TableNotFoundException e) {
                 // Possible when the table timestamp == current timestamp - 1.
                 // This would be most likely during the initial index build of a view index

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
index c104473..207bc2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema;
 
+import java.sql.SQLException;
+
 import org.apache.phoenix.query.MetaDataMutated;
 
 
@@ -26,6 +28,6 @@ public interface PMetaData extends MetaDataMutated, Iterable<PTable>, Cloneable
     }
     public int size();
     public PMetaData clone();
-    public PTable getTable(PTableKey key) throws TableNotFoundException;
+    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException;
     public PMetaData pruneTables(Pruner pruner);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 2f84c95..8cfbb18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -40,32 +40,12 @@ import com.google.common.primitives.Longs;
  *
  */
 public class PMetaDataImpl implements PMetaData {
-        private static final class PTableRef {
-            public final PTable table;
-            public final int estSize;
-            public volatile long lastAccessTime;
-            
-            public PTableRef(PTable table, long lastAccessTime, int estSize) {
-                this.table = table;
-                this.lastAccessTime = lastAccessTime;
-                this.estSize = estSize;
-            }
-
-            public PTableRef(PTable table, long lastAccessTime) {
-                this (table, lastAccessTime, table.getEstimatedSize());
-            }
-
-            public PTableRef(PTableRef tableRef) {
-                this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize);
-            }
-        }
-
         private static class PTableCache implements Cloneable {
             private static final int MIN_REMOVAL_SIZE = 3;
             private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() {
                 @Override
                 public int compare(PTableRef tableRef1, PTableRef tableRef2) {
-                    return Longs.compare(tableRef1.lastAccessTime, tableRef2.lastAccessTime);
+                    return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime());
                 }
             };
             private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR);
@@ -88,7 +68,7 @@ public class PMetaDataImpl implements PMetaData {
                 Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity));
                 // Copy value so that access time isn't changing anymore
                 for (PTableRef tableAccess : tables.values()) {
-                    newTables.put(tableAccess.table.getKey(), new PTableRef(tableAccess));
+                    newTables.put(tableAccess.getTable().getKey(), new PTableRef(tableAccess));
                 }
                 return newTables;
             }
@@ -114,7 +94,7 @@ public class PMetaDataImpl implements PMetaData {
                 if (tableAccess == null) {
                     return null;
                 }
-                tableAccess.lastAccessTime = timeKeeper.getCurrentTime();
+                tableAccess.setLastAccessTime(timeKeeper.getCurrentTime());
                 return tableAccess;
             }
             
@@ -138,37 +118,37 @@ public class PMetaDataImpl implements PMetaData {
                 // Add to new cache, but track references to remove when done
                 // to bring cache at least overage amount below it's max size.
                 for (PTableRef tableRef : tables.values()) {
-                    newCache.put(tableRef.table.getKey(), new PTableRef(tableRef));
+                    newCache.put(tableRef.getTable().getKey(), new PTableRef(tableRef));
                     toRemove.add(tableRef);
-                    toRemoveBytes += tableRef.estSize;
-                    if (toRemoveBytes - toRemove.peekLast().estSize > overage) {
+                    toRemoveBytes += tableRef.getEstSize();
+                    if (toRemoveBytes - toRemove.peekLast().getEstSize() > overage) {
                         PTableRef removedRef = toRemove.removeLast();
-                        toRemoveBytes -= removedRef.estSize;
+                        toRemoveBytes -= removedRef.getEstSize();
                     }
                 }
                 for (PTableRef toRemoveRef : toRemove) {
-                    newCache.remove(toRemoveRef.table.getKey());
+                    newCache.remove(toRemoveRef.getTable().getKey());
                 }
                 return newCache;
             }
 
             private PTable put(PTableKey key, PTableRef ref) {
-                currentByteSize += ref.estSize;
+                currentByteSize += ref.getEstSize();
                 PTableRef oldTableAccess = tables.put(key, ref);
                 PTable oldTable = null;
                 if (oldTableAccess != null) {
-                    currentByteSize -= oldTableAccess.estSize;
-                    oldTable = oldTableAccess.table;
+                    currentByteSize -= oldTableAccess.getEstSize();
+                    oldTable = oldTableAccess.getTable();
                 }
                 return oldTable;
             }
 
-            public PTable put(PTableKey key, PTable value) {
-                return put(key, new PTableRef(value, timeKeeper.getCurrentTime()));
+            public PTable put(PTableKey key, PTable value, long resolvedTime) {
+                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), resolvedTime));
             }
             
-            public PTable putDuplicate(PTableKey key, PTable value) {
-                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0));
+            public PTable putDuplicate(PTableKey key, PTable value, long resolvedTime) {
+                return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0, resolvedTime));
             }
             
             public PTable remove(PTableKey key) {
@@ -176,8 +156,8 @@ public class PMetaDataImpl implements PMetaData {
                 if (value == null) {
                     return null;
                 }
-                currentByteSize -= value.estSize;
-                return value.table;
+                currentByteSize -= value.getEstSize();
+                return value.getTable();
             }
             
             public Iterator<PTable> iterator() {
@@ -191,7 +171,7 @@ public class PMetaDataImpl implements PMetaData {
 
                     @Override
                     public PTable next() {
-                        return iterator.next().table;
+                        return iterator.next().getTable();
                     }
 
                     @Override
@@ -235,12 +215,12 @@ public class PMetaDataImpl implements PMetaData {
     }
     
     @Override
-    public PTable getTable(PTableKey key) throws TableNotFoundException {
+    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
         PTableRef ref = metaData.get(key);
         if (ref == null) {
             throw new TableNotFoundException(key.getName());
         }
-        return ref.table;
+        return ref;
     }
 
     @Override
@@ -248,22 +228,29 @@ public class PMetaDataImpl implements PMetaData {
         return metaData.size();
     }
 
+    @Override
+    public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException {
+    	PTableCache clone = metaData.clone();
+    	clone.putDuplicate(table.getKey(), table, resolvedTimestamp);
+    	return new PMetaDataImpl(clone);
+    }
 
     @Override
-    public PMetaData addTable(PTable table) throws SQLException {
+    public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
         int netGain = 0;
         PTableKey key = table.getKey();
         PTableRef oldTableRef = metaData.get(key);
         if (oldTableRef != null) {
-            netGain -= oldTableRef.estSize;
+            netGain -= oldTableRef.getEstSize();
         }
         PTable newParentTable = null;
+        long parentResolvedTimestamp = resolvedTime;
         if (table.getParentName() != null) { // Upsert new index table into parent data table list
             String parentName = table.getParentName().getString();
             PTableRef oldParentRef = metaData.get(new PTableKey(table.getTenantId(), parentName));
             // If parentTable isn't cached, that's ok we can skip this
             if (oldParentRef != null) {
-                List<PTable> oldIndexes = oldParentRef.table.getIndexes();
+                List<PTable> oldIndexes = oldParentRef.getTable().getIndexes();
                 List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size() + 1);
                 newIndexes.addAll(oldIndexes);
                 for (int i = 0; i < newIndexes.size(); i++) {
@@ -274,8 +261,8 @@ public class PMetaDataImpl implements PMetaData {
                     }
                 }
                 newIndexes.add(table);
-                netGain -= oldParentRef.estSize;
-                newParentTable = PTableImpl.makePTable(oldParentRef.table, table.getTimeStamp(), newIndexes);
+                netGain -= oldParentRef.getEstSize();
+                newParentTable = PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), newIndexes);
                 netGain += newParentTable.getEstimatedSize();
             }
         }
@@ -286,24 +273,24 @@ public class PMetaDataImpl implements PMetaData {
         PTableCache tables = overage <= 0 ? metaData.clone() : metaData.cloneMinusOverage(overage);
         
         if (newParentTable != null) { // Upsert new index table into parent data table list
-            tables.put(newParentTable.getKey(), newParentTable);
-            tables.putDuplicate(table.getKey(), table);
+            tables.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp);
+            tables.putDuplicate(table.getKey(), table, resolvedTime);
         } else {
-            tables.put(table.getKey(), table);
+            tables.put(table.getKey(), table, resolvedTime);
         }
         for (PTable index : table.getIndexes()) {
-            tables.putDuplicate(index.getKey(), index);
+            tables.putDuplicate(index.getKey(), index, resolvedTime);
         }
         return new PMetaDataImpl(tables);
     }
 
     @Override
-    public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
+    public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException {
         PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName));
         if (oldTableRef == null) {
             return this;
         }
-        List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.table);
+        List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.getTable());
         List<PColumn> newColumns;
         if (columnsToAdd.isEmpty()) {
             newColumns = oldColumns;
@@ -312,8 +299,8 @@ public class PMetaDataImpl implements PMetaData {
             newColumns.addAll(oldColumns);
             newColumns.addAll(columnsToAdd);
         }
-        PTable newTable = PTableImpl.makePTable(oldTableRef.table, tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
-        return addTable(newTable);
+        PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls);
+        return addTable(newTable, resolvedTime);
     }
 
     @Override
@@ -340,7 +327,7 @@ public class PMetaDataImpl implements PMetaData {
         }
         // also remove its reference from parent table
         if (parentTableRef != null) {
-            List<PTable> oldIndexes = parentTableRef.table.getIndexes();
+            List<PTable> oldIndexes = parentTableRef.getTable().getIndexes();
             if(oldIndexes != null && !oldIndexes.isEmpty()) {
                 List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size());
                 newIndexes.addAll(oldIndexes);
@@ -349,13 +336,13 @@ public class PMetaDataImpl implements PMetaData {
                     if (index.getName().getString().equals(tableName)) {
                         newIndexes.remove(i);
                         PTable parentTable = PTableImpl.makePTable(
-                                parentTableRef.table,
-                                tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.table.getTimeStamp() : tableTimeStamp,
+                                parentTableRef.getTable(),
+                                tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp,
                                 newIndexes);
                         if (tables == null) { 
                             tables = metaData.clone();
                         }
-                        tables.put(parentTable.getKey(), parentTable);
+                        tables.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp());
                         break;
                     }
                 }
@@ -365,12 +352,12 @@ public class PMetaDataImpl implements PMetaData {
     }
     
     @Override
-    public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException {
+    public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException {
         PTableRef tableRef = metaData.get(new PTableKey(tenantId, tableName));
         if (tableRef == null) {
             return this;
         }
-        PTable table = tableRef.table;
+        PTable table = tableRef.getTable();
         PTableCache tables = metaData.clone();
         for (PColumn columnToRemove : columnsToRemove) {
             PColumn column;
@@ -399,7 +386,7 @@ public class PMetaDataImpl implements PMetaData {
             
             table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns);
         }
-        tables.put(table.getKey(), table);
+        tables.put(table.getKey(), table, resolvedTime);
         return new PMetaDataImpl(tables);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
new file mode 100644
index 0000000..83d0b42
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+public class PTableRef {
+    private final PTable table;
+    private final int estSize;
+	private volatile long lastAccessTime;
+	// timestamp (scn or txn timestamp at which rpc to fetch the table was made)
+    private long resolvedTimeStamp;
+    
+    public PTableRef(PTable table, long lastAccessTime, int estSize, long resolvedTime) {
+        this.table = table;
+        this.lastAccessTime = lastAccessTime;
+        this.estSize = estSize;
+        this.resolvedTimeStamp = resolvedTime;
+    }
+
+    public PTableRef(PTable table, long lastAccessTime, long resolvedTime) {
+        this (table, lastAccessTime, table.getEstimatedSize(), resolvedTime);
+    }
+
+    public PTableRef(PTableRef tableRef) {
+        this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize, tableRef.resolvedTimeStamp);
+    }
+    
+    public PTable getTable() {
+		return table;
+	}
+
+	public long getResolvedTimeStamp() {
+		return resolvedTimeStamp;
+	}
+	
+    public int getEstSize() {
+		return estSize;
+	}
+
+	public long getLastAccessTime() {
+		return lastAccessTime;
+	}
+
+	public void setLastAccessTime(long lastAccessTime) {
+		this.lastAccessTime = lastAccessTime;
+	}
+	
+	public void setResolvedTimeStamp(long resolvedTimeStamp) {
+		this.resolvedTimeStamp = resolvedTimeStamp;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index ec3e64b..316b6ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -113,7 +113,7 @@ public class TableRef {
         if (obj == null) return false;
         if (getClass() != obj.getClass()) return false;
         TableRef other = (TableRef)obj;
-        // FIXME: a null alias on either side should mean a wildcard and should not fail the equals check
+        // a null alias on either side should mean a wildcard and should not fail the equals check
         if ((alias == null && other.alias != null) || (alias != null && !alias.equals(other.alias))) return false;
         if (!table.getName().getString().equals(other.table.getName().getString())) return false;
         return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index b030510..4fdd597 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -268,7 +268,7 @@ public class PhoenixRuntime {
      */
     public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException {
         final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes);
+        final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes, null);
         return new Iterator<Pair<byte[],List<KeyValue>>>() {
 
             @Override
@@ -304,7 +304,7 @@ public class PhoenixRuntime {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         try {
             name = SchemaUtil.normalizeIdentifier(name);
-            table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), name));
+            table = pconn.getTable(new PTableKey(pconn.getTenantId(), name));
         } catch (TableNotFoundException e) {
             String schemaName = SchemaUtil.getSchemaNameFromFullName(name);
             String tableName = SchemaUtil.getTableNameFromFullName(name);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 47db678..4dc792f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -489,15 +489,10 @@ public class SchemaUtil {
     }
 
     protected static PhoenixConnection addMetaDataColumn(PhoenixConnection conn, long scn, String columnDef) throws SQLException {
-        String url = conn.getURL();
-        Properties props = conn.getClientInfo();
-        PMetaData metaData = conn.getMetaDataCache();
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
         PhoenixConnection metaConnection = null;
-
         Statement stmt = null;
         try {
-            metaConnection = new PhoenixConnection(conn.getQueryServices(), url, props, metaData);
+            metaConnection = new PhoenixConnection(conn.getQueryServices(), conn, scn);
             try {
                 stmt = metaConnection.createStatement();
                 stmt.executeUpdate("ALTER TABLE SYSTEM.\"TABLE\" ADD IF NOT EXISTS " + columnDef);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index f013244..b0a8c9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -27,9 +27,13 @@ import co.cask.tephra.TransactionFailureException;
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 
 public class TransactionUtil {
@@ -38,10 +42,14 @@ public class TransactionUtil {
     
     private static final TransactionCodec codec = new TransactionCodec();
     
-    public static long translateMillis(long serverTimeStamp) {
+    public static long convertToNanoseconds(long serverTimeStamp) {
         return serverTimeStamp * 1000000;
     }
     
+    public static long convertToMillisecods(Long serverTimeStamp) {
+        return serverTimeStamp / 1000000;
+    }
+    
     public static byte[] encodeTxnState(Transaction txn) throws SQLException {
         try {
             return codec.encode(txn);
@@ -72,4 +80,34 @@ public class TransactionUtil {
     	// Conflict detection is not needed for tables with write-once/append-only data
     	return new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
     }
+    
+	public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, Long defaultResolvedTimestamp) {
+		Transaction transaction = connection.getMutationState().getTransaction();
+		Long scn = connection.getSCN();
+	    return scn != null ?  scn : (isTransactional && transaction!=null) ? convertToMillisecods(transaction.getReadPointer()) : defaultResolvedTimestamp;
+	}
+
+	public static long getResolvedTime(PhoenixConnection connection, MetaDataMutationResult result) {
+		PTable table = result.getTable();
+		boolean isTransactional = table!=null && table.isTransactional();
+		return getResolvedTimestamp(connection, isTransactional, result.getMutationTime());
+	}
+
+	public static long getTableTimestamp(PhoenixConnection connection, MetaDataMutationResult result) {
+		PTable table = result.getTable();
+		Transaction transaction = connection.getMutationState().getTransaction();
+		boolean transactional = table!=null && table.isTransactional();
+		return  (transactional && transaction!=null) ? convertToMillisecods(transaction.getReadPointer()) : result.getMutationTime();
+	}
+
+	public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional, Long mutationTime) throws SQLException {
+		Long timestamp = mutationTime;
+		MutationState mutationState = connection.getMutationState();
+		if (transactional && mutationState.getTransaction()==null && connection.getSCN()==null) {
+			mutationState.startTransaction();
+			timestamp = convertToMillisecods(mutationState.getTransaction().getReadPointer());
+			connection.commit();
+		}
+		return timestamp;
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 7c36245..fbaa01e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -151,7 +151,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             String query = "CREATE TABLE t1 (k integer not null primary key, a.k decimal, b.k decimal)";
             conn.createStatement().execute(query);
             PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-            PColumn c = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), "T1")).getColumn("K");
+            PColumn c = pconn.getTable(new PTableKey(pconn.getTenantId(), "T1")).getColumn("K");
             assertTrue(SchemaUtil.isPKColumn(c));
         } finally {
             conn.close();
@@ -1162,7 +1162,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
     
     private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        assertEquals(expectedValue, pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
+        assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
index 7a0bac6..5b457e7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
@@ -31,20 +31,20 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 
 public class ViewCompilerTest extends BaseConnectionlessQueryTest {
     @Test
     public void testViewTypeCalculation() throws Exception {
-        assertViewType(new String[] {
+        assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] {
             "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 = 1 AND k2 = 'foo'",
             "CREATE VIEW v2 AS SELECT * FROM t WHERE k2 = 'foo'",
             "CREATE VIEW v3 AS SELECT * FROM t WHERE v = 'bar'||'bas'",
             "CREATE VIEW v4 AS SELECT * FROM t WHERE 'bar'=v and 5+3/2 = k1",
         }, ViewType.UPDATABLE);
-        assertViewType(new String[] {
+        assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] {
                 "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 < 1 AND k2 = 'foo'",
                 "CREATE VIEW v2 AS SELECT * FROM t WHERE substr(k2,0,3) = 'foo'",
                 "CREATE VIEW v3 AS SELECT * FROM t WHERE v = TO_CHAR(CURRENT_DATE())",
@@ -52,28 +52,27 @@ public class ViewCompilerTest extends BaseConnectionlessQueryTest {
             }, ViewType.READ_ONLY);
     }
     
-    public void assertViewType(String[] views, ViewType viewType) throws Exception {
+    public void assertViewType(String[] viewNames, String[] viewDDLs, ViewType viewType) throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
         String ct = "CREATE TABLE t (k1 INTEGER NOT NULL, k2 VARCHAR, v VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2))";
         conn.createStatement().execute(ct);
         
-        for (String view : views) {
-            conn.createStatement().execute(view);
+        for (String viewDDL : viewDDLs) {
+            conn.createStatement().execute(viewDDL);
         }
         
         StringBuilder buf = new StringBuilder();
         int count = 0;
-        for (PTable table : conn.getMetaDataCache()) {
-            if (table.getType() == PTableType.VIEW) {
-                assertEquals(viewType, table.getViewType());
-                conn.createStatement().execute("DROP VIEW " + table.getName().getString());
-                buf.append(' ');
-                buf.append(table.getName().getString());
-                count++;
-            }
+        for (String view : viewNames) {
+        	PTable table = conn.getTable(new PTableKey(null, view));
+            assertEquals(viewType, table.getViewType());
+            conn.createStatement().execute("DROP VIEW " + table.getName().getString());
+            buf.append(' ');
+            buf.append(table.getName().getString());
+            count++;
         }
-        assertEquals("Expected " + views.length + ", but got " + count + ":"+ buf.toString(), views.length, count);
+        assertEquals("Expected " + viewDDLs.length + ", but got " + count + ":"+ buf.toString(), viewDDLs.length, count);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index 29e14bf..cfd76bc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.SortedMap;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.end2end.Shadower;
@@ -646,7 +647,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest {
         }
         stmt.execute();
         
-        final PTable table = conn.unwrap(PhoenixConnection.class).getMetaDataCache().getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC"));
+        final PTable table = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC"));
         GuidePostsInfo info = new GuidePostsInfo(0,Collections.<byte[]> emptyList(), 0l);
         for (byte[] gp : guidePosts) {
             info.addGuidePost(gp, 1000);
@@ -670,7 +671,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest {
                 return table.getTimeStamp()+1;
             }
         });
-        conn.unwrap(PhoenixConnection.class).addTable(tableWithStats);
+        conn.unwrap(PhoenixConnection.class).addTable(tableWithStats, System.currentTimeMillis());
 
         String query = "SELECT count(1) cnt,\n" + 
                 "       coalesce(SUM(impressions), 0.0) AS \"impressions\",\n" + 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 6e1c28f..e941ceb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -100,8 +100,8 @@ public class IndexMaintainerTest  extends BaseConnectionlessQueryTest {
         try {
             conn.createStatement().execute("CREATE INDEX idx ON " + fullTableName + "(" + indexColumns + ") " + (includeColumns.isEmpty() ? "" : "INCLUDE (" + includeColumns + ") ") + (indexProps.isEmpty() ? "" : indexProps));
             PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-            PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName));
-            PTable index = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(),fullIndexName));
+            PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+            PTable index = pconn.getTable(new PTableKey(pconn.getTenantId(),fullIndexName));
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             table.getIndexMaintainers(ptr, pconn);
             List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index abaaeb5..452ea4d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -123,7 +123,7 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP));
         PhoenixConnection conn = DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, props).unwrap(PhoenixConnection.class);
         try {
-            PTable table = conn.getMetaDataCache().getTable(new PTableKey(null, ATABLE_NAME));
+            PTable table = conn.getTable(new PTableKey(null, ATABLE_NAME));
             ATABLE = table;
             ORGANIZATION_ID = new ColumnRef(new TableRef(table), table.getColumn("ORGANIZATION_ID").getPosition()).newColumnExpression();
             ENTITY_ID = new ColumnRef(new TableRef(table), table.getColumn("ENTITY_ID").getPosition()).newColumnExpression();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 3d6cb0c..da965c8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -142,6 +142,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKClientServices;
@@ -498,10 +499,9 @@ public abstract class BaseTest {
         config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
         config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
         config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
         config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
         config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 600);
-//        config.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, ConnectionInfo.getZookeeperConnectionString(getUrl()));
-//        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tmp");
 
         ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
         zkClient = ZKClientServices.delegate(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index cee1054..21e1f62 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -105,7 +105,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
-        PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME));
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME));
         TableRef tableRef = new TableRef(table);
         List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
         List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
index 9379ef3..405a73c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
@@ -32,7 +32,7 @@ public class PMetaDataImplTest {
     
     private static PMetaData addToTable(PMetaData metaData, String name, int size) throws SQLException {
         PTable table = new PSizedTable(new PTableKey(null,name), size);
-        return metaData.addTable(table);
+        return metaData.addTable(table, System.currentTimeMillis());
     }
     
     private static PMetaData removeFromTable(PMetaData metaData, String name) throws SQLException {
@@ -40,7 +40,7 @@ public class PMetaDataImplTest {
     }
     
     private static PTable getFromTable(PMetaData metaData, String name) throws TableNotFoundException {
-        return metaData.getTable(new PTableKey(null,name));
+        return metaData.getTableRef(new PTableKey(null,name)).getTable();
     }
     
     private static void assertNames(PMetaData metaData, String... names) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
index bcd08f0..6977103 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
@@ -56,7 +56,7 @@ public class RowKeySchemaTest  extends BaseConnectionlessQueryTest  {
         String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName));
         conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + "))  " + (dataProps.isEmpty() ? "" : dataProps) );
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
         conn.close();
         StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName  + " VALUES(");
         for (int i = 0; i < values.length; i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
index 23ec4bf..7ab72d6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
@@ -52,7 +52,7 @@ public class RowKeyValueAccessorTest  extends BaseConnectionlessQueryTest  {
         String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName));
         conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + "))  " + (dataProps.isEmpty() ? "" : dataProps) );
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
         conn.close();
         StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName  + " VALUES(");
         for (int i = 0; i < values.length; i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 9fbb8c9..bead2df 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -25,14 +25,17 @@ import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PAR
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -567,5 +570,27 @@ public class TestUtil {
         analyzeTable(conn, tableName);
         conn.close();
     }
+    
+    public static void setRowKeyColumns(PreparedStatement stmt, int i) throws SQLException {
+        // insert row
+        stmt.setString(1, "varchar" + String.valueOf(i));
+        stmt.setString(2, "char" + String.valueOf(i));
+        stmt.setInt(3, i);
+        stmt.setLong(4, i);
+        stmt.setBigDecimal(5, new BigDecimal(i*0.5d));
+        Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
+        stmt.setDate(6, date);
+    }
+	
+    public static void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
+		assertTrue(rs.next());
+		assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
+		assertEquals(rs.getString(2), "char" + String.valueOf(i));
+		assertEquals(rs.getInt(3), i);
+		assertEquals(rs.getInt(4), i);
+		assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
+		Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
+		assertEquals(rs.getDate(6), date);
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 731955e..6c248b1 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -165,6 +165,13 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
     </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <type>test-jar</type>
+      <version>${tephra.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-pig/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-pig/pom.xml b/phoenix-pig/pom.xml
index ea83513..530236c 100644
--- a/phoenix-pig/pom.xml
+++ b/phoenix-pig/pom.xml
@@ -143,6 +143,13 @@
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <type>test-jar</type>
+      <version>${tephra.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9ace160..afb6df3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
     <htrace.version>2.04</htrace.version>
     <collections.version>3.2.1</collections.version>
     <jodatime.version>2.3</jodatime.version>
+    <tephra.version>0.6.1</tephra.version>
 
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>


Mime
View raw message