phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [4/5] PHOENIX-180 Use stats to guide query parallelization (Ramkrishna S. Vasudevan)
Date Mon, 22 Sep 2014 21:36:57 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
index b269cd1..3703af9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java
@@ -53,12 +53,13 @@ public class SaltedIndexIT extends BaseIndexIT {
     @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
-        props.put(QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, Integer.toString(1));
         // Forces server cache to be used
         props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
         // Drop the HBase table metadata for this test
         props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        // Don't put guideposts in
+        // TODO: ovrides are not being correctly propagated into configs, but when
+        // they are we can add this: props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(10000000));
         // Must update config before starting server
         setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator()));
     }
@@ -98,6 +99,12 @@ public class SaltedIndexIT extends BaseIndexIT {
         testMutableTableIndexMaintanence(null, null);
     }
 
+    private void assertQueryPlan(String expectedPrefix, String expectedPostfix, String actual) {
+        assertTrue("Expected query plan to start with [" + expectedPrefix + "], but got [" + actual + "]",
+                actual.startsWith(expectedPrefix));
+        assertTrue("Expected query plan to end with [" + expectedPostfix + "], but got [" + actual + "]", actual.endsWith(expectedPostfix));;
+    }
+    
     private void testMutableTableIndexMaintanence(Integer tableSaltBuckets, Integer indexSaltBuckets) throws Exception {
         String query;
         ResultSet rs;
@@ -123,7 +130,9 @@ public class SaltedIndexIT extends BaseIndexIT {
         stmt.setString(2, "y");
         stmt.execute();
         conn.commit();
-        
+        stmt = conn.prepareStatement("ANALYZE "+DATA_TABLE_FULL_NAME);
+        stmt.execute();
+
         query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
         rs = conn.createStatement().executeQuery(query);
         assertTrue(rs.next());
@@ -177,13 +186,15 @@ public class SaltedIndexIT extends BaseIndexIT {
         assertFalse(rs.next());
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         expectedPlan = tableSaltBuckets == null ? 
-                "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" +
+                "CLIENT PARALLEL 3-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" +
                 "    SERVER SORTED BY [V]\n" + 
                 "CLIENT MERGE SORT" :
-                    "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" + 
+                    "CLIENT PARALLEL 4-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" + 
                     "    SERVER SORTED BY [V]\n" + 
                     "CLIENT MERGE SORT";
-        assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs));
+        String explainPlan2 = QueryUtil.getExplainPlan(rs);
+        String prefix = "CLIENT PARALLEL ";
+        assertQueryPlan(prefix, expectedPlan.substring(prefix.length()+1),explainPlan2);
         
         // Will use data table now, since there's a LIMIT clause and
         // we're able to optimize out the ORDER BY, unless the data
@@ -199,15 +210,16 @@ public class SaltedIndexIT extends BaseIndexIT {
         assertFalse(rs.next());
         rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         expectedPlan = tableSaltBuckets == null ? 
-             "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" +
+             "CLIENT PARALLEL 3-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" +
              "    SERVER FILTER BY V >= 'x'\n" + 
              "    SERVER 2 ROW LIMIT\n" + 
              "CLIENT 2 ROW LIMIT" :
-                 "CLIENT PARALLEL 3-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" +
+                 "CLIENT PARALLEL 6-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" +
                  "    SERVER FILTER BY V >= 'x'\n" + 
                  "    SERVER 2 ROW LIMIT\n" + 
                  "CLIENT MERGE SORT\n" + 
                  "CLIENT 2 ROW LIMIT";
-        assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs));
+        String explainPlan = QueryUtil.getExplainPlan(rs);
+        assertQueryPlan(prefix, expectedPlan.substring(prefix.length()+1),explainPlan);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java
index e6fe7d3..690b15c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java
@@ -23,10 +23,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
@@ -94,12 +96,12 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT {
             stmt.setInt(2, 1);
             stmt.execute();
             conn.commit();
-            
             query = "UPSERT INTO target(pk, col) SELECT pk, col from source";
             stmt = conn.prepareStatement(query);
             stmt.execute();
             conn.commit();
-            
+            analyzeTable(conn, "source");
+            analyzeTable(conn, "target");
             query = "SELECT * FROM target";
             stmt = conn.prepareStatement(query);
             ResultSet rs = stmt.executeQuery();
@@ -111,6 +113,11 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
+        String query = "ANALYZE " + tableName;
+        conn.createStatement().execute(query);
+    }
 
     @Test
     public void testUpsertSaltedTableIntoSaltedTable() throws Exception {
@@ -188,12 +195,12 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(false);
         try {
-            String ddl = "CREATE TABLE IF NOT EXISTS source" + 
+            String ddl = "CREATE TABLE IF NOT EXISTS source1" + 
                     " (pk1 varchar NULL, pk2 varchar NULL, pk3 integer NOT NULL, col1 INTEGER" + 
                     " CONSTRAINT pk PRIMARY KEY (pk1, pk2, pk3)) SALT_BUCKETS=4";
             createTestTable(getUrl(), ddl);
             
-            String query = "UPSERT INTO source(pk1, pk2, pk3, col1) VALUES(?,?,?,?)";
+            String query = "UPSERT INTO source1(pk1, pk2, pk3, col1) VALUES(?,?,?,?)";
             PreparedStatement stmt = conn.prepareStatement(query);
             stmt.setString(1, "1");
             stmt.setString(2, "2");
@@ -203,12 +210,12 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT {
             conn.commit();
             
             conn.setAutoCommit(true);
-            query = "UPSERT INTO source(pk3, col1, pk1) SELECT pk3+1, col1+1, pk2 from source";
+            query = "UPSERT INTO source1(pk3, col1, pk1) SELECT pk3+1, col1+1, pk2 from source1";
             stmt = conn.prepareStatement(query);
             stmt.execute();
             conn.commit();
-            
-            query = "SELECT col1 FROM source";
+            analyzeTable(conn, "source1");
+            query = "SELECT col1 FROM source1";
             stmt = conn.prepareStatement(query);
             ResultSet rs = stmt.executeQuery();
             assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index a900628..f4c43d3 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -1,6 +1,4 @@
 /**
- * Copyright 2010 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -105,12 +103,12 @@ tokens
     MAXVALUE='maxvalue';
     CYCLE='cycle';
     CASCADE='cascade';
+    ANALYZE='analyze';
 }
 
 
 @parser::header {
 /**
- * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -357,6 +355,7 @@ non_select_node returns [BindableStatement ret]
     |   s=alter_table_node
     |	s=create_sequence_node
     |	s=drop_sequence_node
+    |   s=update_statistics_node
     |   s=explain_node) { contextStack.pop();  $ret = s; }
     ;
     
@@ -493,6 +492,11 @@ alter_table_node returns [AlterTableStatement ret]
         { PTableType tt = v==null ? (QueryConstants.SYSTEM_SCHEMA_NAME.equals(t.getSchemaName()) ? PTableType.SYSTEM : PTableType.TABLE) : PTableType.VIEW; ret = ( c == null ? factory.addColumn(factory.namedTable(null,t), tt, d, ex!=null, p) : factory.dropColumn(factory.namedTable(null,t), tt, c, ex!=null) ); }
     ;
 
+update_statistics_node returns [UpdateStatisticsStatement ret]
+	:   ANALYZE t=from_table_name
+		{ret = factory.updateStatistics(factory.namedTable(null, t));}
+	;
+
 prop_name returns [String ret]
     :   p=identifier {$ret = SchemaUtil.normalizeIdentifier(p); }
     ;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index a36db0b..fcef0ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -72,10 +72,13 @@ public class GlobalCache extends TenantCacheImpl {
             synchronized(this) {
                 result = metaDataCache;
                 if(result == null) {
+                    long maxTTL = Math.min(config.getLong(
+                            QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                            QueryServicesOptions.DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS), config.getLong(
+                            QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
+                            QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS));
                     long maxSize = config.getLong(QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB,
                             QueryServicesOptions.DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE);
-                    long maxTTL = config.getLong(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
-                            QueryServicesOptions.DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS);
                     metaDataCache = result = CacheBuilder.newBuilder()
                             .maximumWeight(maxSize)
                             .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 48970c8..5f04fa0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -40,7 +40,6 @@ import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.schema.AmbiguousColumnException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9af606e..c05ef30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -59,11 +59,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -104,8 +105,12 @@ import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PhoenixArray;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.stat.PTableStats;
+import org.apache.phoenix.schema.stat.PTableStatsImpl;
+import org.apache.phoenix.schema.stat.StatisticsUtils;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -445,11 +450,85 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                 addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
             }
         }
-        
-        return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, 
-                tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId);
+        PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName(
+                schemaName.getString(), tableName.getString())) : physicalTables.get(0);
+        PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), columns);
+        return PTableImpl
+                .makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName,
+                        saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows,
+                        physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType,
+                        viewIndexId, stats);
+    }
+    
+    private PTableStats updateStatsInternal(byte[] tableNameBytes, List<PColumn> columns) throws IOException {
+        List<PName> family = Lists.newArrayListWithExpectedSize(columns.size());
+        for (PColumn column : columns) {
+            PName familyName = column.getFamilyName();
+            if (familyName != null) {
+                family.add(familyName);
+            }
+        }
+        HTable statsHTable = null;
+        try {
+            // Can we do a new HTable instance here? Or get it from a pool or cache of these instances?
+            statsHTable = new HTable(getEnvironment().getConfiguration(), PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+            Scan s = new Scan();
+            if (tableNameBytes != null) {
+                // Check for an efficient way here
+                s.setStartRow(tableNameBytes);
+                s.setStopRow(ByteUtil.nextKey(tableNameBytes));
+            }
+            ResultScanner scanner = statsHTable.getScanner(s);
+            Result result = null;
+            byte[] fam = null;
+            List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(columns.size());
+            TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
+            while ((result = scanner.next()) != null) {
+                KeyValue[] kvs = result.raw();
+                for(KeyValue kv : kvs) {
+                    // For now collect only guide posts
+                    if (Bytes.equals(kv.getQualifier(), PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES)) {
+                        byte[] cfInCell = StatisticsUtils.getCFFromRowKey(tableNameBytes, kv.getRow());
+                        if (fam == null) {
+                            fam = cfInCell;
+                        } else if (!Bytes.equals(fam, cfInCell)) {
+                            // Sort all the guide posts
+                            guidePostsPerCf.put(cfInCell, guidePosts);
+                            guidePosts = new ArrayList<byte[]>();
+                            fam = cfInCell;
+                        }
+                        byte[] guidePostVal = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(),
+                                kv.getValueLength()).copyBytesIfNecessary();
+                        PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePostVal);
+                        if (array != null && array.getDimensions() != 0) {
+                            for (int j = 0; j < array.getDimensions(); j++) {
+                                byte[] gp = array.toBytes(j);
+                                if (gp.length != 0) {
+                                    guidePosts.add(gp);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            if (fam != null) {
+                // Sort all the guideposts
+                guidePostsPerCf.put(fam, guidePosts);
+            }
+            return new PTableStatsImpl(guidePostsPerCf);
+        } catch (Exception e) {
+            if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) {
+                logger.warn("Stats table not yet online", e);
+            } else {
+                throw new IOException(e);
+            }
+        } finally {
+            if (statsHTable != null) {
+                statsHTable.close();
+            }
+        }
+        return PTableStatsImpl.NO_STATS;
     }
-
     private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
             return null;
@@ -1337,11 +1416,41 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
             return results.size() > 0 && !allViewsNotInSingleRegion;
         }
         
-        /**
+        /**o
          * Returns true is the table has views and they are all NOT in the same HBase region.
          */
         private boolean allViewsInMultipleRegions() {
             return results.size() > 0 && allViewsNotInSingleRegion;
         }
     }
+    
+    @Override
+    public void clearCacheForTable(byte[] tenantId, byte[] schema, byte[] tableName, final long clientTS)
+            throws IOException {
+        byte[] tableKey = SchemaUtil.getTableKey(tenantId, schema, tableName);
+        ImmutableBytesPtr key = new ImmutableBytesPtr(tableKey);
+        try {
+            PTable table = doGetTable(tableKey, clientTS);
+            if (table != null) {
+                Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(getEnvironment()).getMetaDataCache();
+                // Add +1 to the ts
+                // TODO : refactor doGetTable() to do this - optionally increment the timestamp
+                // TODO : clear metadata if it is spread across multiple region servers
+                long ts = table.getTimeStamp() + 1;
+                // Here we could add an empty puti
+                HRegion region = getEnvironment().getRegion();
+                List<Mutation> mutations = new ArrayList<Mutation>();
+                Put p = new Put(tableKey);
+                p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ts, ByteUtil.EMPTY_BYTE_ARRAY);
+                mutations.add(p);
+                region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet());
+                metaDataCache.invalidate(key);
+            }
+        } catch (Throwable t) {
+            // We could still invalidate it
+            logger.error("clearCacheForTable failed to update the latest ts ", t);
+            throw new IOException(t);
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 351662b..66fb1ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -267,6 +267,9 @@ public interface MetaDataProtocol extends CoprocessorProtocol {
      */
     void clearCache();
     
+    void clearCacheForTable(final byte[] tenantID, final byte[] schema, final byte[] tableName, final long clientTS)
+            throws IOException;
+    
     /**
      * Get the version of the server-side HBase and phoenix.jar. Used when initially connecting
      * to a cluster to ensure that the client and server jars are compatible.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 768785b..9864218 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -43,7 +43,6 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
@@ -271,7 +270,6 @@ public class MutationState implements SQLCloseable {
     private long[] validate() throws SQLException {
         int i = 0;
         Long scn = connection.getSCN();
-        PName tenantId = connection.getTenantId();
         MetaDataClient client = new MetaDataClient(connection);
         long[] timeStamps = new long[this.mutations.size()];
         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
index 92756a5..7e76234 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java
@@ -17,30 +17,30 @@
  */
 package org.apache.phoenix.expression.function;
 
+import static org.apache.phoenix.schema.PDataType.DECIMAL;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.KeyPart;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 import com.google.common.collect.Lists;
-import java.util.Collections;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.phoenix.compile.KeyPart;
-import static org.apache.phoenix.expression.function.ScalarFunction.evaluateExpression;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PColumn;
-import static org.apache.phoenix.schema.PDataType.DECIMAL;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
index 7fb99ff..a54b5b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -18,31 +18,28 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.query.StatsManager;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 
 /**
@@ -54,12 +51,11 @@ import org.apache.phoenix.util.ReadOnlyProps;
  */
 public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter {
 
-    protected final int targetConcurrency;
-    protected final int maxConcurrency;
-    protected final int maxIntraRegionParallelization;
+    protected final long guidePostsDepth;
     protected final StatementContext context;
     protected final TableRef tableRef;
 
+    private static final Logger logger = LoggerFactory.getLogger(DefaultParallelIteratorRegionSplitter.class);
     public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) {
         return new DefaultParallelIteratorRegionSplitter(context, table, hintNode);
     }
@@ -68,22 +64,16 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
         this.context = context;
         this.tableRef = table;
         ReadOnlyProps props = context.getConnection().getQueryServices().getProps();
-        this.targetConcurrency = props.getInt(QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB,
-                QueryServicesOptions.DEFAULT_TARGET_QUERY_CONCURRENCY);
-        this.maxConcurrency = props.getInt(QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB,
-                QueryServicesOptions.DEFAULT_MAX_QUERY_CONCURRENCY);
-        Preconditions.checkArgument(targetConcurrency >= 1, "Invalid target concurrency: " + targetConcurrency);
-        Preconditions.checkArgument(maxConcurrency >= targetConcurrency , "Invalid max concurrency: " + maxConcurrency);
-        this.maxIntraRegionParallelization = hintNode.hasHint(Hint.NO_INTRA_REGION_PARALLELIZATION) ? 1 : props.getInt(QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB,
-                QueryServicesOptions.DEFAULT_MAX_INTRA_REGION_PARALLELIZATION);
-        Preconditions.checkArgument(maxIntraRegionParallelization >= 1 , "Invalid max intra region parallelization: " + maxIntraRegionParallelization);
+        this.guidePostsDepth = props.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
+                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
     }
 
     // Get the mapping between key range and the regions that contains them.
     protected List<HRegionLocation> getAllRegions() throws SQLException {
         Scan scan = context.getScan();
         PTable table = tableRef.getTable();
-        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes());
+        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices()
+                .getAllTableRegions(table.getPhysicalName().getBytes());
         // If we're not salting, then we've already intersected the minMaxRange with the scan range
         // so there's nothing to do here.
         return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
@@ -107,7 +97,8 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
         regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() {
             @Override
             public boolean apply(HRegionLocation location) {
-                KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey());
+                KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location
+                        .getRegionInfo().getEndKey());
                 return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
             }
         });
@@ -115,109 +106,66 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
     }
 
     protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
-        if (regions.isEmpty()) {
-            return Collections.emptyList();
-        }
-        
-        StatsManager statsManager = context.getConnection().getQueryServices().getStatsManager();
-        // the splits are computed as follows:
-        //
-        // let's suppose:
-        // t = target concurrency
-        // m = max concurrency
-        // r = the number of regions we need to scan
-        //
-        // if r >= t:
-        //    scan using regional boundaries
-        // elif r > t/2:
-        //    split each region in s splits such that:
-        //    s = max(x) where s * x < m
-        // else:
-        //    split each region in s splits such that:
-        //    s = max(x) where s * x < t
-        //
-        // The idea is to align splits with region boundaries. If rows are not evenly
-        // distributed across regions, using this scheme compensates for regions that
-        // have more rows than others, by applying tighter splits and therefore spawning
-        // off more scans over the overloaded regions.
-        int splitsPerRegion = regions.size() >= targetConcurrency ? 1 : (regions.size() > targetConcurrency / 2 ? maxConcurrency : targetConcurrency) / regions.size();
-        splitsPerRegion = Math.min(splitsPerRegion, maxIntraRegionParallelization);
-        // Create a multi-map of ServerName to List<KeyRange> which we'll use to round robin from to ensure
-        // that we keep each region server busy for each query.
-        ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion = ArrayListMultimap.create(regions.size(),regions.size() * splitsPerRegion);;
-        if (splitsPerRegion == 1) {
-            for (HRegionLocation region : regions) {
-                keyRangesPerRegion.put(region, ParallelIterators.TO_KEY_RANGE.apply(region));
-            }
+        if (regions.isEmpty()) { return Collections.emptyList(); }
+        Scan scan = context.getScan();
+        PTable table = this.tableRef.getTable();
+        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
+        List<byte[]> gps = Lists.newArrayList();
+
+        if (table.getColumnFamilies().isEmpty()) {
+            // For sure we can get the defaultCF from the table
+            gps = table.getGuidePosts();
         } else {
-            // Maintain bucket for each server and then returns KeyRanges in round-robin
-            // order to ensure all servers are utilized.
-            for (HRegionLocation region : regions) {
-                byte[] startKey = region.getRegionInfo().getStartKey();
-                byte[] stopKey = region.getRegionInfo().getEndKey();
-                boolean lowerUnbound = Bytes.compareTo(startKey, HConstants.EMPTY_START_ROW) == 0;
-                boolean upperUnbound = Bytes.compareTo(stopKey, HConstants.EMPTY_END_ROW) == 0;
-                /*
-                 * If lower/upper unbound, get the min/max key from the stats manager.
-                 * We use this as the boundary to split on, but we still use the empty
-                 * byte as the boundary in the actual scan (in case our stats are out
-                 * of date).
-                 */
-                if (lowerUnbound) {
-                    startKey = statsManager.getMinKey(tableRef);
-                    if (startKey == null) {
-                        keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
-                        continue;
-                    }
-                }
-                if (upperUnbound) {
-                    stopKey = statsManager.getMaxKey(tableRef);
-                    if (stopKey == null) {
-                        keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
-                        continue;
+            try {
+                if (scan.getFamilyMap().size() > 0) {
+                    if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+                        gps = table.getColumnFamily(defaultCF).getGuidePosts();
+                    } else { // Otherwise, just use first CF in use by scan
+                        gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
                     }
-                }
-                
-                byte[][] boundaries = null;
-                // Both startKey and stopKey will be empty the first time
-                if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries = Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) {
-                    // Bytes.split may return null if the key space
-                    // between start and end key is too small
-                    keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
                 } else {
-                    keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ? KeyRange.UNBOUND : boundaries[0], boundaries[1]));
-                    if (boundaries.length > 1) {
-                        for (int i = 1; i < boundaries.length-2; i++) {
-                            keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true, boundaries[i+1], false));
-                        }
-                        keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2], true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], false));
-                    }
+                    gps = table.getColumnFamily(defaultCF).getGuidePosts();
                 }
+            } catch (ColumnFamilyNotFoundException cfne) {
+                // Alter table does this
             }
+
         }
-        List<KeyRange> splits = Lists.newArrayListWithCapacity(regions.size() * splitsPerRegion);
-        // as documented for ListMultimap
-        Collection<Collection<KeyRange>> values = keyRangesPerRegion.asMap().values();
-        List<Collection<KeyRange>> keyRangesList = Lists.newArrayList(values);
-        // Randomize range order to ensure that we don't hit the region servers in the same order every time
-        // thus helping to distribute the load more evenly
-        Collections.shuffle(keyRangesList);
-        // Transpose values in map to get regions in round-robin server order. This ensures that
-        // all servers will be used to process the set of parallel threads available in our executor.
-        int i = 0;
-        boolean done;
-        do {
-            done = true;
-            for (int j = 0; j < keyRangesList.size(); j++) {
-                List<KeyRange> keyRanges = (List<KeyRange>)keyRangesList.get(j);
-                if (i < keyRanges.size()) {
-                    splits.add(keyRanges.get(i));
-                    done = false;
+
+        List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size());
+        byte[] currentKey = regions.get(0).getRegionInfo().getStartKey();
+        byte[] endKey = null;
+        int regionIndex = 0;
+        int guideIndex = 0;
+        int gpsSize = gps.size();
+        int regionSize = regions.size();
+        if (currentKey.length > 0) {
+            guideIndex = Collections.binarySearch(gps, currentKey, Bytes.BYTES_COMPARATOR);
+            guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1));
+        }
+        // Merge bisect with guideposts for all but the last region
+        while (regionIndex < regionSize) {
+            byte[] currentGuidePost;
+            currentKey = regions.get(regionIndex).getRegionInfo().getStartKey();
+            endKey = regions.get(regionIndex++).getRegionInfo().getEndKey();
+            while (guideIndex < gpsSize
+                    && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) {
+                KeyRange keyRange = KeyRange.getKeyRange(currentKey, currentGuidePost);
+                if (keyRange != KeyRange.EMPTY_RANGE) {
+                    guidePosts.add(keyRange);
                 }
+                currentKey = currentGuidePost;
+                guideIndex++;
+            }
+            KeyRange keyRange = KeyRange.getKeyRange(currentKey, endKey);
+            if (keyRange != KeyRange.EMPTY_RANGE) {
+                guidePosts.add(keyRange);
             }
-            i++;
-        } while (!done);
-        return splits;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("The captured guideposts are: " + guidePosts);
+        }
+        return guidePosts;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 987169a..cb7d492 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -105,6 +105,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final byte[] SYSTEM_CATALOG_TABLE_BYTES = Bytes.toBytes(SYSTEM_CATALOG_SCHEMA);
     public static final String SYSTEM_CATALOG_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE);
     public static final byte[] SYSTEM_CATALOG_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_CATALOG_SCHEMA_BYTES);
+    public static final String SYSTEM_STATS_TABLE = "STATS";
+    public static final byte[] SYSTEM_STATS_BYTES = Bytes.toBytes(SYSTEM_STATS_TABLE);
+    public static final String SYSTEM_STATS_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_STATS_TABLE);
+    public static final byte[] SYSTEM_STATS_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_STATS_BYTES);
     
     public static final String SYSTEM_CATALOG_ALIAS = "\"SYSTEM.TABLE\"";
 
@@ -112,6 +116,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final byte[] TABLE_NAME_BYTES = Bytes.toBytes(TABLE_NAME);
     public static final String TABLE_TYPE = "TABLE_TYPE";
     public static final byte[] TABLE_TYPE_BYTES = Bytes.toBytes(TABLE_TYPE);
+    public static final String PHYSICAL_NAME = "PHYSICAL_NAME";
+    public static final byte[] PHYSICAL_NAME_BYTES = Bytes.toBytes(PHYSICAL_NAME);
     
     public static final String COLUMN_FAMILY = "COLUMN_FAMILY";
     public static final String TABLE_CAT = "TABLE_CAT";
@@ -222,6 +228,16 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final String INDEX_DISABLE_TIMESTAMP = "INDEX_DISABLE_TIMESTAMP";
     public static final byte[] INDEX_DISABLE_TIMESTAMP_BYTES = Bytes.toBytes(INDEX_DISABLE_TIMESTAMP);
     
+    public static final String REGION_NAME = "REGION_NAME";
+    public static final byte[] REGION_NAME_BYTES = Bytes.toBytes(REGION_NAME);
+    public static final String GUIDE_POSTS = "GUIDE_POSTS";
+    public static final byte[] GUIDE_POSTS_BYTES = Bytes.toBytes(GUIDE_POSTS);
+    public static final String MIN_KEY = "MIN_KEY";
+    public static final byte[] MIN_KEY_BYTES = Bytes.toBytes(MIN_KEY);
+    public static final String MAX_KEY = "MAX_KEY";
+    public static final byte[] MAX_KEY_BYTES = Bytes.toBytes(MAX_KEY);
+    public static final String LAST_STATS_UPDATE_TIME = "LAST_STATS_UPDATE_TIME";
+    public static final byte[] LAST_STATS_UPDATE_TIME_BYTES = Bytes.toBytes(LAST_STATS_UPDATE_TIME);
     private final PhoenixConnection connection;
     private final ResultSet emptyResultSet;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 45e6973..2840dca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.UpdateStatisticsStatement;
 import org.apache.phoenix.parse.UpsertStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
@@ -644,6 +645,49 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
             };
         }
     }
+    
+    private static class ExecutableUpdateStatisticsStatement extends UpdateStatisticsStatement implements
+            CompilableStatement {
+
+        public ExecutableUpdateStatisticsStatement(NamedTableNode table) {
+            super(table);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            final StatementContext context = new StatementContext(stmt);
+            return new MutationPlan() {
+
+                @Override
+                public StatementContext getContext() {
+                    return context;
+                }
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("ANALYZE"));
+                }
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return stmt.getConnection();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getConnection());
+                    return client.updateStatistics(ExecutableUpdateStatisticsStatement.this);
+                }
+            };
+        }
+
+    }
 
     private static class ExecutableAddColumnStatement extends AddColumnStatement implements CompilableStatement {
 
@@ -799,6 +843,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         public ExplainStatement explain(BindableStatement statement) {
             return new ExecutableExplainStatement(statement);
         }
+
+        @Override
+        public UpdateStatisticsStatement updateStatistics(NamedTableNode table) {
+            return new ExecutableUpdateStatisticsStatement(table);
+        }
     }
     
     static class PhoenixStatementParser extends SQLParser {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index f1e39eb..7032d83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -338,6 +338,10 @@ public class ParseNodeFactory {
     public DivideParseNode divide(List<ParseNode> children) {
         return new DivideParseNode(children);
     }
+    
+    public UpdateStatisticsStatement updateStatistics(NamedTableNode table) {
+      return new UpdateStatisticsStatement(table);
+    }
 
 
     public FunctionParseNode functionDistinct(String name, List<ParseNode> args) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
new file mode 100644
index 0000000..9eff74a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+public class UpdateStatisticsStatement extends SingleTableStatement {
+
+    public UpdateStatisticsStatement(NamedTableNode table) {
+        super(table, 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 5f43a63..a05acde 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -63,8 +63,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
 
     public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException;
 
-    public StatsManager getStatsManager();
-
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException;
 
     public PhoenixConnection connect(String url, Properties info) throws SQLException;
@@ -96,6 +94,9 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     void addConnection(PhoenixConnection connection) throws SQLException;
     void removeConnection(PhoenixConnection connection) throws SQLException;
 
+    long updateStatistics(KeyRange keyRange, byte[] tableName)
+            throws SQLException;
+
     /**
      * @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase.
      */
@@ -105,4 +106,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     public boolean supportsFeature(Feature feature);
     
     public String getUserName();
+    public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, long clientTS) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 086dc0f..0676e86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -102,6 +102,9 @@ import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
+import org.apache.phoenix.schema.stat.StatisticsCollectorProtocol;
+import org.apache.phoenix.schema.stat.StatisticsCollectorResponse;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -118,6 +121,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.protobuf.ServiceException;
 
 public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
     private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
@@ -129,7 +133,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final ReadOnlyProps props;
     private final String userName;
     private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
-    private final StatsManager statsManager;
 
     // Cache the latest meta data here for future connections
     // writes guarded by "latestMetaDataLock"
@@ -185,9 +188,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         // TODO: should we track connection wide memory usage or just org-wide usage?
         // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate
         this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY);
-        int statsUpdateFrequencyMs = this.getProps().getInt(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
-        int maxStatsAgeMs = this.getProps().getInt(QueryServices.MAX_STATS_AGE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_STATS_AGE_MS);
-        this.statsManager = new StatsManagerImpl(this, statsUpdateFrequencyMs, maxStatsAgeMs);
 
         // find the HBase version and use that to determine the KeyValueBuilder that should be used
         String hbaseVersion = VersionInfo.getVersion();
@@ -216,11 +216,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public StatsManager getStatsManager() {
-        return this.statsManager;
-    }
-
-    @Override
     public HTableInterface getTable(byte[] tableName) throws SQLException {
         try {
             return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor());
@@ -275,42 +270,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 sqlE = e;
             } finally {
                 try {
-                    // Clear any client-side caches.
-                    statsManager.clearStats();
-                } catch (SQLException e) {
+                    childServices.clear();
+                    synchronized (latestMetaDataLock) {
+                        latestMetaData = null;
+                        latestMetaDataLock.notifyAll();
+                    }
+                    if (connection != null) connection.close();
+                } catch (IOException e) {
                     if (sqlE == null) {
-                        sqlE = e;
+                        sqlE = ServerUtil.parseServerException(e);
                     } else {
-                        sqlE.setNextException(e);
+                        sqlE.setNextException(ServerUtil.parseServerException(e));
                     }
                 } finally {
                     try {
-                        childServices.clear();
-                        synchronized (latestMetaDataLock) {
-                            latestMetaData = null;
-                            latestMetaDataLock.notifyAll();
-                        }
-                        if (connection != null) connection.close();
-                    } catch (IOException e) {
+                        super.close();
+                    } catch (SQLException e) {
                         if (sqlE == null) {
-                            sqlE = ServerUtil.parseServerException(e);
+                            sqlE = e;
                         } else {
-                            sqlE.setNextException(ServerUtil.parseServerException(e));
+                            sqlE.setNextException(e);
                         }
                     } finally {
-                        try {
-                            super.close();
-                        } catch (SQLException e) {
-                            if (sqlE == null) {
-                                sqlE = e;
-                            } else {
-                                sqlE.setNextException(e);
-                            }
-                        } finally {
-                            if (sqlE != null) {
-                                throw sqlE;
-                            }
-                        }
+                        if (sqlE != null) { throw sqlE; }
                     }
                 }
             }
@@ -540,6 +522,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
             }
+            if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
+                descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
+            }
 
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
@@ -1310,6 +1295,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     ConnectionQueryServicesImpl.this, url, scnProps, newEmptyMetaData());
                             try {
                                 metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+                                // TODO : Get this from a configuration
+                                metaConnection.createStatement().executeUpdate(
+                                        QueryConstants.CREATE_STATS_TABLE_METADATA);
                             } catch (NewerTableAlreadyExistsException ignore) {
                                 // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp.
                                 // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
@@ -1811,4 +1799,78 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         throw new IllegalStateException("Connection to the cluster is closed");
     }
 
+    @Override
+    public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
+        HTableInterface ht = null;
+        try {
+            ht = this.getTable(tableName);
+            long noOfRowsScanned = 0;
+            Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse> callable = 
+                    new Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse>() {
+                  @Override
+                public StatisticsCollectorResponse call(StatisticsCollectorProtocol instance) throws IOException {
+                    return instance.collectStat(keyRange);
+                  }
+                };
+                Map<byte[], StatisticsCollectorResponse> result = ht.coprocessorExec(StatisticsCollectorProtocol.class,
+                    keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
+                for (StatisticsCollectorResponse response : result.values()) {
+                    noOfRowsScanned += response.getRowsScanned();
+                }
+                return noOfRowsScanned;
+        } catch (ServiceException e) {
+            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+        } catch (TableNotFoundException e) {
+            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+        } catch (Throwable e) {
+            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+        } finally {
+            if (ht != null) {
+                try {
+                    ht.close();
+                } catch (IOException e) {
+                    throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
+            final long clientTS) throws SQLException {
+        // clear the meta data cache for the table here
+        try {
+            SQLException sqlE = null;
+            HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+            try {
+                metaDataCoprocessorExec(tableKey,
+                        new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() {
+                    @Override
+                    public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException {
+                        instance.clearCacheForTable(tenantId, schemaName, tableName, clientTS);
+                        // TODO : Should this really return a result?Return null
+                        return new MetaDataMutationResult();
+                    }
+                });
+
+            } catch (Throwable e) {
+                sqlE = new SQLException(e);
+            } finally {
+                try {
+                    htable.close();
+                } catch (IOException e) {
+                    if (sqlE == null) {
+                        sqlE = ServerUtil.parseServerException(e);
+                    } else {
+                        sqlE.setNextException(ServerUtil.parseServerException(e));
+                    }
+                } finally {
+                    if (sqlE != null) { throw sqlE; }
+                }
+            }
+        } catch (Exception e) {
+            throw new SQLException(ServerUtil.parseServerException(e));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 09f42aa..4e155e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -65,7 +65,6 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -116,30 +115,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public StatsManager getStatsManager() {
-        return new StatsManager() {
-
-            @Override
-            public byte[] getMinKey(TableRef table) {
-                return HConstants.EMPTY_START_ROW;
-            }
-
-            @Override
-            public byte[] getMaxKey(TableRef table) {
-                return HConstants.EMPTY_END_ROW;
-            }
-
-            @Override
-            public void updateStats(TableRef table) throws SQLException {
-            }
-
-            @Override
-            public void clearStats() throws SQLException {
-            }
-        };
-    }
-
-    @Override
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
         return Collections.singletonList(new HRegionLocation(new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),"localhost",-1));
     }
@@ -207,6 +182,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
     }
 
+    @Override
+    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
+        // Noop
+        return 0;
+    }
+    
+    @Override
+    public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+            throws SQLException {}
     // TODO: share this with ConnectionQueryServicesImpl
     @Override
     public void init(String url, Properties props) throws SQLException {
@@ -242,6 +226,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
                     // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
                     // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
                 }
+                try {
+                    // TODO : Get this from a configuration
+                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {
+                    // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
+                    // timestamp.
+                    // A TableAlreadyExistsException is not thrown, since the table only exists *after* this
+                    // fixed timestamp.
+                }
             } catch (SQLException e) {
                 sqlE = e;
             } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 0b6a399..fa01f09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -65,11 +65,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public StatsManager getStatsManager() {
-        return getDelegate().getStatsManager();
-    }
-
-    @Override
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
         return getDelegate().getAllTableRegions(tableName);
     }
@@ -231,4 +226,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public String getUserName() {
         return getDelegate().getUserName();
     }
+
+    @Override
+    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
+        return getDelegate().updateStatistics(keyRange, tableName);
+    }
+    
+    @Override
+    public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+            throws SQLException {
+        getDelegate().clearCacheForTable(tenantId, schemaName, tableName, clientTS);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 1e128c2..f5ba490 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -34,23 +34,31 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG;
@@ -66,6 +74,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -77,8 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+
 import java.math.BigDecimal;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -110,6 +118,7 @@ public interface QueryConstants {
     
     public enum JoinType {INNER, LEFT_OUTER}
     public final static String SYSTEM_SCHEMA_NAME = "SYSTEM";
+    public final static byte[] SYSTEM_SCHEMA_NAME_BYTES = Bytes.toBytes(SYSTEM_SCHEMA_NAME);
     public final static String PHOENIX_METADATA = "table";
 
     public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
@@ -219,6 +228,22 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
     
+    public static final String CREATE_STATS_TABLE_METADATA = 
+            "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
+            // PK columns
+            PHYSICAL_NAME  + " VARCHAR NOT NULL," +
+            COLUMN_FAMILY + " VARCHAR," +
+            REGION_NAME + " VARCHAR," +
+            GUIDE_POSTS  + " VARBINARY[]," +
+            MIN_KEY + " VARBINARY," + 
+            MAX_KEY + " VARBINARY," +
+            LAST_STATS_UPDATE_TIME+ " DATE, "+
+            "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY ("
+            + PHYSICAL_NAME + ","
+            + COLUMN_FAMILY + ","+ REGION_NAME+"))\n" +
+            HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            
     public static final String CREATE_SEQUENCE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +                                    
             TENANT_ID + " VARCHAR NULL," +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index da35626..14fea16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -68,18 +68,14 @@ public interface QueryServices extends SQLCloseable {
     public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs";
     public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage";
     public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes";
-    public static final String TARGET_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.targetConcurrency";
-    public static final String MAX_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.maxConcurrency";
     public static final String DATE_FORMAT_ATTRIB = "phoenix.query.dateFormat";
     public static final String NUMBER_FORMAT_ATTRIB = "phoenix.query.numberFormat";
     public static final String STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.query.statsUpdateFrequency";
-    public static final String MAX_STATS_AGE_MS_ATTRIB = "phoenix.query.maxStatsAge";
     public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin";
     public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
     public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
-    public static final String MAX_INTRA_REGION_PARALLELIZATION_ATTRIB  = "phoenix.query.maxIntraRegionParallelization";
     public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB  = "phoenix.query.rowKeyOrderSaltedTable";
     public static final String USE_INDEXES_ATTRIB  = "phoenix.query.useIndexes";
     public static final String IMMUTABLE_ROWS_ATTRIB  = "phoenix.mutate.immutableRows";
@@ -123,6 +119,7 @@ public interface QueryServices extends SQLCloseable {
     // Index will be partially re-built from index disable time stamp - following overlap time 
     public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB = 
         "phoenix.index.failure.handling.rebuild.overlap.time";
+    public static final String HISTOGRAM_BYTE_DEPTH_ATTRIB = "phoenix.guidepost.width";
     
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 78681a0..9a3284e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,16 +24,15 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
@@ -51,7 +50,6 @@ import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -74,7 +72,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 public class QueryServicesOptions {
 	public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
 	public static final int DEFAULT_THREAD_POOL_SIZE = 128;
-	public static final int DEFAULT_QUEUE_SIZE = 500;
+	public static final int DEFAULT_QUEUE_SIZE = 5000;
 	public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
 	public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
     public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp";
@@ -107,6 +105,7 @@ public class QueryServicesOptions {
     // latency and client-side spooling/buffering. Smaller means less initial
     // latency and less parallelization.
     public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 2999;
+    public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 1024 * 1024;
     
     // 
     // Spillable GroupBy - SPGBY prefix
@@ -167,13 +166,10 @@ public class QueryServicesOptions {
             .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
             .setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE)
             .setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE)
-            .setIfUnset(TARGET_QUERY_CONCURRENCY_ATTRIB, DEFAULT_TARGET_QUERY_CONCURRENCY)
-            .setIfUnset(MAX_QUERY_CONCURRENCY_ATTRIB, DEFAULT_MAX_QUERY_CONCURRENCY)
             .setIfUnset(DATE_FORMAT_ATTRIB, DEFAULT_DATE_FORMAT)
             .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS)
             .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN)
             .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
-            .setIfUnset(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION)
             .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE)
             .setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES)
             .setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
@@ -185,6 +181,7 @@ public class QueryServicesOptions {
             .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES)
             .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE)
             .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
+            .setIfUnset(HISTOGRAM_BYTE_DEPTH_ATTRIB, DEFAULT_HISTOGRAM_BYTE_DEPTH);
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -275,14 +272,6 @@ public class QueryServicesOptions {
         return set(SCAN_CACHE_SIZE_ATTRIB, scanFetchSize);
     }
     
-    public QueryServicesOptions setMaxQueryConcurrency(int maxQueryConcurrency) {
-        return set(MAX_QUERY_CONCURRENCY_ATTRIB, maxQueryConcurrency);
-    }
-
-    public QueryServicesOptions setTargetQueryConcurrency(int targetQueryConcurrency) {
-        return set(TARGET_QUERY_CONCURRENCY_ATTRIB, targetQueryConcurrency);
-    }
-    
     public QueryServicesOptions setDateFormat(String dateFormat) {
         return set(DATE_FORMAT_ATTRIB, dateFormat);
     }
@@ -291,6 +280,10 @@ public class QueryServicesOptions {
         return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs);
     }
     
+    public QueryServicesOptions setHistogramDepthBytes(int depth) {
+        return set(HISTOGRAM_BYTE_DEPTH_ATTRIB, depth);
+    }
+    
     public QueryServicesOptions setCallQueueRoundRobin(boolean isRoundRobin) {
         return set(CALL_QUEUE_PRODUCER_ATTRIB_NAME, isRoundRobin);
     }
@@ -303,10 +296,6 @@ public class QueryServicesOptions {
         return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
     }
     
-    public QueryServicesOptions setMaxIntraRegionParallelization(int maxIntraRegionParallelization) {
-        return set(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, maxIntraRegionParallelization);
-    }
-    
     public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) {
         return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable);
     }
@@ -375,11 +364,7 @@ public class QueryServicesOptions {
     public int getMutateBatchSize() {
         return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE);
     }
-    
-    public int getMaxIntraRegionParallelization() {
-        return config.getInt(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION);
-    }
-    
+
     public boolean isUseIndexes() {
         return config.getBoolean(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES);
     }
@@ -436,4 +421,7 @@ public class QueryServicesOptions {
         return set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, walEditCodec);
     }
 
+    public QueryServicesOptions setHistogramByteDepth(long byteDepth) {
+        return set(HISTOGRAM_BYTE_DEPTH_ATTRIB, byteDepth);
+    }
 }


Mime
View raw message