phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/2] git commit: PHOENIX-1390 Stats not updated on client after major compaction
Date Tue, 28 Oct 2014 19:44:46 GMT
PHOENIX-1390 Stats not updated on client after major compaction


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a8a023a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a8a023a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a8a023a

Branch: refs/heads/4.0
Commit: 7a8a023a3d2bdb694f02fc1560a0f5eb35294a96
Parents: 851f57a
Author: James Taylor <jtaylor@salesforce.com>
Authored: Tue Oct 28 12:44:37 2014 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Tue Oct 28 12:44:37 2014 -0700

----------------------------------------------------------------------
 .../end2end/BaseClientManagedTimeIT.java        | 15 +++-
 .../org/apache/phoenix/end2end/BaseQueryIT.java |  3 +-
 .../end2end/ClientTimeArithmeticQueryIT.java    | 43 +++++++++++
 .../phoenix/end2end/InMemoryOrderByIT.java      |  4 +-
 .../org/apache/phoenix/end2end/QueryIT.java     | 24 ++++--
 .../apache/phoenix/end2end/ReverseScanIT.java   |  2 +-
 .../org/apache/phoenix/end2end/SequenceIT.java  |  7 +-
 .../phoenix/end2end/SpooledOrderByIT.java       |  4 +-
 .../phoenix/end2end/StatsCollectorIT.java       | 55 +++++++++++++-
 .../apache/phoenix/end2end/UpsertSelectIT.java  |  4 +-
 .../phoenix/compile/ExpressionCompiler.java     |  2 +-
 .../coprocessor/MetaDataEndpointImpl.java       | 35 ++-------
 .../UngroupedAggregateRegionObserver.java       | 21 ++++--
 .../org/apache/phoenix/query/QueryServices.java |  1 +
 .../phoenix/query/QueryServicesOptions.java     |  5 +-
 .../apache/phoenix/schema/MetaDataClient.java   |  4 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  2 +-
 .../phoenix/schema/stats/PTableStats.java       |  7 ++
 .../phoenix/schema/stats/PTableStatsImpl.java   | 12 ++-
 .../schema/stats/StatisticsCollector.java       | 79 ++++++++++++--------
 .../phoenix/schema/stats/StatisticsScanner.java |  1 -
 .../phoenix/schema/stats/StatisticsUtil.java    |  6 +-
 .../phoenix/schema/stats/StatisticsWriter.java  | 39 ++++++----
 23 files changed, 259 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
index 14dffcb..1acd5b3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java
@@ -17,16 +17,21 @@
  */
 package org.apache.phoenix.end2end;
 
+import java.util.Map;
+
 import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 /**
  * Base class for tests that manage their own time stamps
  * We need to separate these from tests that rely on hbase to set
@@ -54,9 +59,17 @@ public abstract class BaseClientManagedTimeIT extends BaseTest {
         deletePriorTables(ts - 1, getUrl());    
     }
     
+    public static Map<String,String> getDefaultProps() {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+        // Must update config before starting server
+        props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, Boolean.FALSE.toString());
+        return props;
+    }
+    
     @BeforeClass
     public static void doSetup() throws Exception {
-        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+        Map<String,String> props = getDefaultProps();
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
     @AfterClass

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
index 7a3e86e..f3031f4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseQueryIT.java
@@ -48,7 +48,6 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 
 
@@ -70,7 +69,7 @@ public abstract class BaseQueryIT extends BaseClientManagedTimeIT {
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+        Map<String,String> props = getDefaultProps();
         props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
         props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
         // Make a small batch size to test multiple calls to reserve sequences

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
index 98b233c..d709b9c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientTimeArithmeticQueryIT.java
@@ -49,6 +49,7 @@ import java.util.Properties;
 
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -596,5 +597,47 @@ public class ClientTimeArithmeticQueryIT extends BaseQueryIT {
         }
     }
 
+    @Test
+    public void testDateDateSubtract() throws Exception {
+        String url;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        
+        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+        Connection conn = DriverManager.getConnection(url, props);
+        PreparedStatement statement = conn.prepareStatement("UPSERT INTO ATABLE(organization_id,entity_id,a_time) VALUES(?,?,?)");
+        statement.setString(1, getOrganizationId());
+        statement.setString(2, ROW2);
+        statement.setDate(3, date);
+        statement.execute();
+        statement.setString(2, ROW3);
+        statement.setDate(3, date);
+        statement.execute();
+        statement.setString(2, ROW4);
+        statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1));
+        statement.execute();
+        statement.setString(2, ROW6);
+        statement.setDate(3, new Date(date.getTime() + TestUtil.MILLIS_IN_DAY - 1));
+        statement.execute();
+        statement.setString(2, ROW9);
+        statement.setDate(3, date);
+        statement.execute();
+        conn.commit();
+        conn.close();
+
+        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 25);
+        conn = DriverManager.getConnection(url, props);
+        try {
+            statement = conn.prepareStatement("SELECT entity_id, b_string FROM ATABLE WHERE a_date - a_time > 1");
+            ResultSet rs = statement.executeQuery();
+            @SuppressWarnings("unchecked")
+            List<List<Object>> expectedResults = Lists.newArrayList(
+                    Arrays.<Object>asList(ROW3, E_VALUE),
+                    Arrays.<Object>asList( ROW6, E_VALUE), 
+                    Arrays.<Object>asList(ROW9, E_VALUE));
+            assertValuesEqualsResultSet(rs, expectedResults);
+        } finally {
+            conn.close();
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
index 48a0581..533143c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InMemoryOrderByIT.java
@@ -24,8 +24,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Maps;
-
 @Category(ClientManagedTimeTest.class)
 public class InMemoryOrderByIT extends OrderByIT {
 
@@ -35,7 +33,7 @@ public class InMemoryOrderByIT extends OrderByIT {
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        Map<String,String> props = getDefaultProps();
         props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1024*1024));
         // Must update config before starting server
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
index f45b689..fe65e10 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryIT.java
@@ -250,7 +250,7 @@ public class QueryIT extends BaseQueryIT {
     @Test
     public void testPointInTimeScan() throws Exception {
         // Override value that was set at creation time
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1); // Run query at timestamp 5
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection upsertConn = DriverManager.getConnection(url, props);
         String upsertStmt =
@@ -267,13 +267,15 @@ public class QueryIT extends BaseQueryIT {
         stmt.setString(2, ROW4);
         stmt.setInt(3, 5);
         stmt.execute(); // should commit too
-        Connection conn1 = DriverManager.getConnection(getUrl(), props);
+        
+        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);
+        Connection conn1 = DriverManager.getConnection(url, props);
         analyzeTable(conn1, "ATABLE");
         conn1.close();
         upsertConn.close();
 
         // Override value again, but should be ignored since it's past the SCN
-        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 3); // Run query at timestamp 5
+        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 30);
         upsertConn = DriverManager.getConnection(url, props);
         upsertConn.setAutoCommit(true); // Test auto commit
         // Insert all rows at ts
@@ -285,7 +287,7 @@ public class QueryIT extends BaseQueryIT {
         upsertConn.close();
         
         String query = "SELECT organization_id, a_string AS a FROM atable WHERE organization_id=? and a_integer = 5";
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         PreparedStatement statement = conn.prepareStatement(query);
         statement.setString(1, tenantId);
@@ -394,7 +396,7 @@ public class QueryIT extends BaseQueryIT {
             "    A_TIMESTAMP) " +
             "VALUES (?, ?, ?)";
         // Override value that was set at creation time
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection upsertConn = DriverManager.getConnection(url, props);
         upsertConn.setAutoCommit(true); // Test auto commit
@@ -405,9 +407,12 @@ public class QueryIT extends BaseQueryIT {
         byte[] ts1 = PDataType.TIMESTAMP.toBytes(tsValue1);
         stmt.setTimestamp(3, tsValue1);
         stmt.execute();
-        Connection conn1 = DriverManager.getConnection(getUrl(), props);
+        
+        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 15);       
+        Connection conn1 = DriverManager.getConnection(url, props);
         analyzeTable(conn1, "ATABLE");
         conn1.close();
+        
         updateStmt = 
             "upsert into " +
             "ATABLE(" +
@@ -426,15 +431,18 @@ public class QueryIT extends BaseQueryIT {
         stmt.setTime(4, new Time(tsValue2.getTime()));
         stmt.execute();
         upsertConn.close();
-        conn1 = DriverManager.getConnection(getUrl(), props);
+        
+        url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 20);       
+        conn1 = DriverManager.getConnection(url, props);
         analyzeTable(conn1, "ATABLE");
         conn1.close();
+        
         analyzeTable(upsertConn, "ATABLE");
         assertTrue(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts2), new ImmutableBytesWritable(ts1)));
         assertFalse(compare(CompareOp.GREATER, new ImmutableBytesWritable(ts1), new ImmutableBytesWritable(ts1)));
 
         String query = "SELECT entity_id, a_timestamp, a_time FROM aTable WHERE organization_id=? and a_timestamp > ?";
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 3)); // Execute at timestamp 2
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30)); // Execute at timestamp 2
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
             PreparedStatement statement = conn.prepareStatement(query);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
index 26d6d4b..e279710 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
@@ -50,7 +50,7 @@ import com.google.common.collect.Maps;
 @Category(HBaseManagedTimeTest.class)
 public class ReverseScanIT extends BaseHBaseManagedTimeIT {
     @BeforeClass
-    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
index 4f2b9a9..b4b0b2e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceIT.java
@@ -51,7 +51,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 @Category(ClientManagedTimeTest.class)
 public class SequenceIT extends BaseClientManagedTimeIT {
@@ -63,11 +62,9 @@ public class SequenceIT extends BaseClientManagedTimeIT {
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
-        // Make a small batch size to test multiple calls to reserve sequences
-        props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
+        Map<String,String> props = getDefaultProps();
         // Must update config before starting server
+        props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
index 2533a29..c35ecab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledOrderByIT.java
@@ -24,15 +24,13 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Maps;
-
 @Category(ClientManagedTimeTest.class)
 public class SpooledOrderByIT extends OrderByIT {
 
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        Map<String,String> props = getDefaultProps();
         props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(100));
         // Must update config before starting server
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index 51ad543..b9a0e88 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getAllSplits;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -27,9 +29,15 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -41,7 +49,8 @@ import com.google.common.collect.Maps;
 
 @Category(NeedsOwnMiniClusterTest.class)
 public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
-    
+    private static final String STATS_TEST_TABLE_NAME = "S";
+        
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
@@ -222,4 +231,48 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT {
         return stmt;
     }
 
+    private void compactTable(Connection conn) throws IOException, InterruptedException, SQLException {
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HBaseAdmin admin = services.getAdmin();
+        try {
+            admin.flush(STATS_TEST_TABLE_NAME);
+            admin.majorCompact(STATS_TEST_TABLE_NAME);
+            Thread.sleep(10000); // FIXME: how do we know when compaction is done?
+        } finally {
+            admin.close();
+        }
+        services.clearCache();
+    }
+    
+    @Test
+    public void testCompactUpdatesStats() throws Exception {
+        int nRows = 10;
+        Connection conn;
+        PreparedStatement stmt;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("CREATE TABLE " + STATS_TEST_TABLE_NAME + "(k CHAR(1) PRIMARY KEY, v INTEGER) " + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
+        stmt = conn.prepareStatement("UPSERT INTO " + STATS_TEST_TABLE_NAME + " VALUES(?,?)");
+        for (int i = 0; i < nRows; i++) {
+            stmt.setString(1, Character.toString((char) ('a' + i)));
+            stmt.setInt(2, i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+        
+        compactTable(conn);
+        conn = DriverManager.getConnection(getUrl(), props);
+        List<KeyRange>keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
+        assertEquals(nRows+1, keyRanges.size());
+        
+        int nDeletedRows = conn.createStatement().executeUpdate("DELETE FROM " + STATS_TEST_TABLE_NAME + " WHERE V < 5");
+        conn.commit();
+        assertEquals(5, nDeletedRows);
+        
+        compactTable(conn);
+        
+        keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
+        assertEquals(nRows/2+1, keyRanges.size());
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 642ba62..ac54fe4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -55,8 +55,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Maps;
-
 @Category(ClientManagedTimeTest.class)
 public class UpsertSelectIT extends BaseClientManagedTimeIT {
 	
@@ -64,7 +62,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
   @BeforeClass
   @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
   public static void doSetup() throws Exception {
-      Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+      Map<String,String> props = getDefaultProps();
       props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(500));
       props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index e06a88f..3876b8a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -891,7 +891,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
                 if (isType1Date || isType2Date) {
                     if (isType1Date && isType2Date) {
                         i = 2;
-                        theType = PDataType.LONG;
+                        theType = PDataType.DECIMAL;
                     } else if (isType1Date && type2 != null
                             && type2.isCoercibleTo(PDataType.DECIMAL)) {
                         i = 2;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b90fb2e..3abd206 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -608,6 +608,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             try {
                 statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
                 stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
+                timeStamp = Math.max(timeStamp, stats.getTimestamp()); 
             } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
                 logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?");
             } finally {
@@ -1264,32 +1265,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
     
-    private PTable incrementTableTimestamp(byte[] key, long clientTimeStamp) throws IOException, SQLException {
-        HRegion region = env.getRegion();
-        RowLock lid = region.getRowLock(key);
-        if (lid == null) {
-            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
-        }
-        try {
-            PTable table = doGetTable(key, clientTimeStamp, lid);
-            if (table != null) {
-                long tableTimeStamp = table.getTimeStamp() + 1;
-                List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1);
-                Put p = new Put(key);
-                p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, tableTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
-                mutations.add(p);
-                region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet());
-                
-                Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(env).getMetaDataCache();
-                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-                metaDataCache.invalidate(cacheKey);
-            }
-            return table;
-        } finally {
-            lid.release();
-        }
-    }
-    
     private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
         return doGetTable(key, clientTimeStamp, null);
     }
@@ -1711,9 +1686,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[] tableName = request.getTableName().toByteArray();
         try {
             byte[] tenantId = request.getTenantId().toByteArray();
-            long clientTimeStamp = request.getClientTimestamp();
-            byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
-            incrementTableTimestamp(tableKey, clientTimeStamp);
+            byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+            Cache<ImmutableBytesPtr, PTable> metaDataCache =
+                    GlobalCache.getInstance(this.env).getMetaDataCache();
+            metaDataCache.invalidate(cacheKey);
         } catch (Throwable t) {
             logger.error("incrementTableTimeStamp failed", t);
             ProtobufUtil.setControllerException(controller,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 710409f..aba35fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -72,6 +72,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.PColumn;
@@ -459,9 +460,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
         if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
                 && scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
             try {
-                // TODO: for users that manage timestamps themselves, we should provide
-                // a means of specifying/getting this.
-                long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+                boolean useCurrentTime = 
+                        c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, 
+                                QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+                // Provides a means of clients controlling their timestamps to not use current time
+                // when background tasks are updating stats. Instead we track the max timestamp of
+                // the cells and use that.
+                long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
                 StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(), clientTimeStamp);
                 internalScan =
                         stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
@@ -485,9 +490,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
         if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
             StatisticsCollector stats = null;
             try {
-                // TODO: for users that manage timestamps themselves, we should provide
-                // a means of specifying/getting this.
-                long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
+                boolean useCurrentTime = 
+                        e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, 
+                                QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+                // Provides a means of clients controlling their timestamps to not use current time
+                // when background tasks are updating stats. Instead we track the max timestamp of
+                // the cells and use that.
+                long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
                 stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp);
                 stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
             } catch (IOException ioe) { 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 7f000c0..72002ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -136,6 +136,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String MIN_STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.stats.minUpdateFrequency";
     public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
     public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
+    public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";
 
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7ee225b..7c8ecd4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -52,6 +52,7 @@ import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -146,7 +147,8 @@ public class QueryServicesOptions {
     public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
 
     public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
-    public static final int DEFAULT_GUIDE_POSTS_PER_REGION = 20;
+    public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 100 * 1024 *1024; // 100MB
+    public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
 
     public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
     
@@ -175,6 +177,7 @@ public class QueryServicesOptions {
     public static QueryServicesOptions withDefaults() {
         Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
         QueryServicesOptions options = new QueryServicesOptions(config)
+            .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME)
             .setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS)
             .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
             .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index afe21e8..b763bbb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -613,13 +613,13 @@ public class MetaDataClient {
         Long scn = connection.getSCN();
         // Always invalidate the cache
         long clientTimeStamp = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
-        String query = "SELECT CURRENT_DATE() - " + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
+        String query = "SELECT CURRENT_DATE()," + LAST_STATS_UPDATE_TIME + " FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME
                 + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
                 + " IS NULL AND " + REGION_NAME + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL";
         ResultSet rs = connection.createStatement().executeQuery(query);
         long msSinceLastUpdate = Long.MAX_VALUE;
         if (rs.next()) {
-            msSinceLastUpdate = rs.getLong(1);
+            msSinceLastUpdate = rs.getLong(1) - rs.getLong(2);
         }
         if (msSinceLastUpdate < msMinBetweenUpdates) {
             return 0;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 2448f39..8f85ccc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -886,7 +886,7 @@ public class PTableImpl implements PTable {
             GuidePostsInfo info = new GuidePostsInfo(pTableStatsProto.getGuidePostsByteCount(), value);
             tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info);
       }
-      PTableStats stats = new PTableStatsImpl(tableGuidePosts);
+      PTableStats stats = new PTableStatsImpl(tableGuidePosts, timeStamp);
 
       PName dataTableName = null;
       if (table.hasDataTableNameBytes()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
index 3745487..435fe87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
@@ -37,6 +37,11 @@ public interface PTableStats {
         public int getEstimatedSize() {
             return 0;
         }
+
+        @Override
+        public long getTimestamp() {
+            return StatisticsCollector.NO_TIMESTAMP;
+        }
     };
 
     /**
@@ -47,4 +52,6 @@ public interface PTableStats {
     SortedMap<byte[], GuidePostsInfo> getGuidePosts();
 
     int getEstimatedSize();
+    
+    long getTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
index dcf7b00..dc70e86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
@@ -23,6 +23,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.sun.istack.NotNull;
@@ -33,13 +34,15 @@ import com.sun.istack.NotNull;
 public class PTableStatsImpl implements PTableStats {
     private final SortedMap<byte[], GuidePostsInfo> guidePosts;
     private final int estimatedSize;
+    private final long timeStamp;
 
     public PTableStatsImpl() {
-        this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR));
+        this(new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR), MetaDataProtocol.MIN_TABLE_TIMESTAMP);
     }
 
-    public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts) {
+    public PTableStatsImpl(@NotNull SortedMap<byte[], GuidePostsInfo> guidePosts, long timeStamp) {
         this.guidePosts = guidePosts;
+        this.timeStamp = timeStamp;
         int estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.INT_SIZE + SizedUtil.sizeOfTreeMap(guidePosts.size());
         for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) {
             byte[] cf = entry.getKey();
@@ -84,4 +87,9 @@ public class PTableStatsImpl implements PTableStats {
     public int getEstimatedSize() {
         return estimatedSize;
     }
+
+    @Override
+    public long getTimestamp() {
+        return timeStamp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 53bd18a..3511d12 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -23,8 +23,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -45,12 +43,15 @@ import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.TimeKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -64,33 +65,45 @@ import com.google.common.collect.Maps;
  * board for now.
  */
 public class StatisticsCollector {
+    private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class);
+    public static final long NO_TIMESTAMP = -1;
 
     private Map<String, byte[]> minMap = Maps.newHashMap();
     private Map<String, byte[]> maxMap = Maps.newHashMap();
     private long guidepostDepth;
+    private boolean useCurrentTime;
+    private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
     private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap();
     // Tracks the bytecount per family if it has reached the guidePostsDepth
     private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
     protected StatisticsWriter statsTable;
-    // Ensures that either analyze or compaction happens at any point of time.
-    private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
 
     public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
         Configuration config = env.getConfiguration();
         HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
-        long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
-        if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
-            maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+        useCurrentTime = 
+            config.getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, 
+                    QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+        int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0);
+        if (guidepostPerRegion > 0) {
+            long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize();
+            if (maxFileSize <= 0) { // HBase brain dead API doesn't give you the "real" max file size if it's not set...
+                maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
+            }
+            guidepostDepth = maxFileSize / guidepostPerRegion;
+        } else {
+            guidepostDepth = config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
         }
-        guidepostDepth =
-            config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                    maxFileSize / config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 
-                                                QueryServicesOptions.DEFAULT_GUIDE_POSTS_PER_REGION));
         // Get the stats table associated with the current table on which the CP is
         // triggered
         this.statsTable = StatisticsWriter.newWriter(statsHTable, tableName, clientTimeStamp);
     }
     
+    public long getMaxTimeStamp() {
+        return maxTimeStamp;
+    }
+    
     public void close() throws IOException {
         this.statsTable.close();
     }
@@ -99,12 +112,12 @@ public class StatisticsCollector {
         try {
             ArrayList<Mutation> mutations = new ArrayList<Mutation>();
             writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Committing new stats for the region " + region.getRegionInfo());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Committing new stats for the region " + region.getRegionInfo());
             }
             commitStats(mutations);
         } catch (IOException e) {
-            LOG.error(e);
+            logger.error("Unable to commit new stats", e);
         } finally {
             clear();
         }
@@ -116,20 +129,20 @@ public class StatisticsCollector {
             // update the statistics table
             for (ImmutableBytesPtr fam : familyMap.keySet()) {
                 if (delete) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
+                    if(logger.isDebugEnabled()) {
+                        logger.debug("Deleting the stats for the region "+region.getRegionInfo());
                     }
                     statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()),
                             mutations);
                 }
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Adding new stats for the region "+region.getRegionInfo());
+                if(logger.isDebugEnabled()) {
+                    logger.debug("Adding new stats for the region "+region.getRegionInfo());
                 }
                 statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()),
                         mutations);
             }
         } catch (IOException e) {
-            LOG.error("Failed to update statistics table!", e);
+            logger.error("Failed to update statistics table!", e);
             throw e;
         }
     }
@@ -147,7 +160,7 @@ public class StatisticsCollector {
                         mutations);
             }
         } catch (IOException e) {
-            LOG.error("Failed to delete from statistics table!", e);
+            logger.error("Failed to delete from statistics table!", e);
             throw e;
         }
     }
@@ -195,8 +208,8 @@ public class StatisticsCollector {
                 internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
                         smallestReadPoint, earliestPutTs);
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Compaction scanner created for stats");
+            if (logger.isDebugEnabled()) {
+                logger.debug("Compaction scanner created for stats");
             }
             InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
             if (scanner != null) {
@@ -212,22 +225,22 @@ public class StatisticsCollector {
             // Create a delete operation on the parent region
             // Then write the new guide posts for individual regions
             List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
-            long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+            long currentTime = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : -1;
             deleteStatsFromStatsTable(region, mutations, currentTime);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Collecting stats for the daughter region " + l.getRegionInfo());
             }
             collectStatsForSplitRegions(conf, l, mutations, currentTime);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Collecting stats for the daughter region " + r.getRegionInfo());
             }
             collectStatsForSplitRegions(conf, r, mutations, currentTime);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
             }
             commitStats(mutations);
         } catch (IOException e) {
-            LOG.error("Error while capturing stats after split of region "
+            logger.error("Error while capturing stats after split of region "
                     + region.getRegionInfo().getRegionNameAsString(), e);
         }
     }
@@ -244,13 +257,13 @@ public class StatisticsCollector {
             count = scanRegion(scanner, count);
             writeStatsToStatsTable(daughter, false, mutations, currentTime);
         } catch (IOException e) {
-            LOG.error(e);
+            logger.error("Unable to collects stats during split", e);
             toThrow = e;
         } finally {
                 try {
                     if (scanner != null) scanner.close();
                 } catch (IOException e) {
-                    LOG.error(e);
+                    logger.error("Unable to close scanner after split", e);
                     if (toThrow != null) toThrow = e;
                 } finally {
                     if (toThrow != null) throw toThrow;
@@ -278,6 +291,7 @@ public class StatisticsCollector {
         this.minMap.clear();
         this.guidePostsMap.clear();
         this.familyMap.clear();
+        maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
     }
 
     public void updateStatistic(KeyValue kv) {
@@ -302,6 +316,7 @@ public class StatisticsCollector {
                 maxMap.put(fam, row);
             }
         }
+        maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
         // TODO : This can be moved to an interface so that we could collect guide posts in different ways
         Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(fam);
         if (gps == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 60b9601..3a84cfc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -35,7 +35,6 @@ public class StatisticsScanner implements InternalScanner {
 
     public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region,
             InternalScanner delegate, byte[] family) {
-        // should there be only one tracker?
         this.tracker = tracker;
         this.stats = stats;
         this.delegate = delegate;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index b8d64bd..eb183e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -72,6 +72,7 @@ public class StatisticsUtil {
         s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
         ResultScanner scanner = statsHTable.getScanner(s);
         Result result = null;
+        long timeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
         TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR);
         while ((result = scanner.next()) != null) {
             CellScanner cellScanner = result.cellScanner();
@@ -88,10 +89,13 @@ public class StatisticsUtil {
                 if (oldInfo != null) {
                     newInfo.combine(oldInfo);
                 }
+                if (current.getTimestamp() > timeStamp) {
+                    timeStamp = current.getTimestamp();
+                }
             }
         }
         if (!guidePostsPerCf.isEmpty()) {
-            return new PTableStatsImpl(guidePostsPerCf);
+            return new PTableStatsImpl(guidePostsPerCf, timeStamp);
         }
         return PTableStats.EMPTY_STATS;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a8a023a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 6da135e..22f0ead 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -62,7 +62,9 @@ public class StatisticsWriter implements Closeable {
             clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
         }
         StatisticsWriter statsTable = new StatisticsWriter(hTable, tableName, clientTimeStamp);
-        statsTable.commitLastStatsUpdatedTime();
+        if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts yet
+            statsTable.commitLastStatsUpdatedTime();
+        }
         return statsTable;
     }
 
@@ -101,26 +103,31 @@ public class StatisticsWriter implements Closeable {
      */
     public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
         if (tracker == null) { return; }
-
+        boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
+        long timeStamp = clientTimeStamp;
+        if (useMaxTimeStamp) { // When using max timestamp, we write the update time later because we only know the ts now
+            timeStamp = tracker.getMaxTimeStamp();
+            mutations.add(getLastStatsUpdatedTimePut(timeStamp));
+        }
         byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
                 PDataType.VARCHAR.toBytes(regionName));
         Put put = new Put(prefix);
         GuidePostsInfo gp = tracker.getGuidePosts(fam);
         if (gp != null) {
             put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT_BYTES,
-                    clientTimeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
+                    timeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size())));
             put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES,
-                    clientTimeStamp, PDataType.VARBINARY.toBytes(gp.toBytes()));
+                    timeStamp, PDataType.VARBINARY.toBytes(gp.toBytes()));
             put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES,
-                    clientTimeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
+                    timeStamp, PDataType.LONG.toBytes(gp.getByteCount()));
         }
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
-                clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
+                timeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
-                clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+                timeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
         // Add our empty column value so queries behave correctly
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
-                clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+                timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
         mutations.add(put);
     }
 
@@ -153,21 +160,27 @@ public class StatisticsWriter implements Closeable {
         }
     }
 
-    private void commitLastStatsUpdatedTime() throws IOException {
-        // Always use wallclock time for this, as it's a mechanism to prevent
-        // stats from being collected too often.
+    private Put getLastStatsUpdatedTimePut(long timeStamp) {
         long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
         byte[] prefix = tableName;
         Put put = new Put(prefix);
-        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp,
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, timeStamp,
                 PDataType.DATE.toBytes(new Date(currentTime)));
+        return put;
+    }
+
+    private void commitLastStatsUpdatedTime() throws IOException {
+        // Always use wallclock time for this, as it's a mechanism to prevent
+        // stats from being collected too often.
+        Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
         statisticsTable.put(put);
     }
     
     public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
             throws IOException {
+        long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp;
         byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
                 PDataType.VARCHAR.toBytes(regionName));
-        mutations.add(new Delete(prefix, clientTimeStamp - 1));
+        mutations.add(new Delete(prefix, timeStamp - 1));
     }
 }
\ No newline at end of file


Mime
View raw message