phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/5] git commit: PHOENIX-1320 Update stats atomically
Date Mon, 06 Oct 2014 01:43:55 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master b1be0f8b8 -> 166671c89


PHOENIX-1320 Update stats atomically


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8621b7c9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8621b7c9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8621b7c9

Branch: refs/heads/master
Commit: 8621b7c9b7d2ca89643fdabf0948e30781326a27
Parents: b1be0f8
Author: James Taylor <jtaylor@salesforce.com>
Authored: Sat Oct 4 16:54:36 2014 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Sun Oct 5 18:42:45 2014 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 58 ++++++++-------
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  3 +-
 .../query/ConnectionQueryServicesImpl.java      | 44 ++++-------
 .../apache/phoenix/query/QueryConstants.java    |  5 +-
 .../schema/stat/StatisticsCollector.java        | 66 +++++++++--------
 .../phoenix/schema/stat/StatisticsScanner.java  | 34 ++++++---
 .../phoenix/schema/stat/StatisticsTable.java    | 78 ++++++++++++--------
 .../phoenix/schema/stat/StatisticsUtils.java    |  8 --
 .../org/apache/phoenix/util/SchemaUtil.java     |  5 ++
 9 files changed, 166 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0bf2710..4ddb322 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -34,8 +34,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
@@ -83,7 +81,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.stat.StatisticsCollector;
-import org.apache.phoenix.schema.stat.StatisticsTable;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -116,8 +113,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
     public static final String EMPTY_CF = "EmptyCF";
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
-    private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class);
-    private StatisticsTable statsTable = null;
     
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -125,8 +120,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
         // Can't use ClientKeyValueBuilder on server-side because the memstore expects to
         // be able to get a single backing buffer for a KeyValue.
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
-        String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getTableName().getNameAsString();
-        this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(),
name);
     }
 
     private static void commitBatch(HRegion region, List<Mutation> mutations, byte[]
indexUUID) throws IOException {
@@ -161,12 +154,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment>
c, final Scan scan, final RegionScanner s) throws IOException {
         int offset = 0;
-        boolean isAnalyze = false;
         HRegion region = c.getEnvironment().getRegion();
         StatisticsCollector stats = null;
-        if(ScanUtil.isAnalyzeTable(scan) && statsTable != null) {
-            stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
-            isAnalyze = true;
+        if(ScanUtil.isAnalyzeTable(scan)) {
+            // Let this throw, as this scan is being done for the sole purpose of collecting
stats
+            stats = new StatisticsCollector(c.getEnvironment(), region.getRegionInfo().getTable().getNameAsString());
         }
         if (ScanUtil.isLocalIndex(scan)) {
             /*
@@ -260,7 +252,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
                 // since this is an indication of whether or not there are more values after
the
                 // ones returned
                 hasMore = innerScanner.nextRaw(results);
-                if(isAnalyze && stats != null) {
+                if(stats != null) {
                     stats.collectStatistics(results);
                 }
                 
@@ -383,13 +375,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
             } while (hasMore);
         } finally {
             try {
-                if (isAnalyze && stats != null) {
-                    stats.updateStatistic(region);
-                    stats.clear();
+                if (stats != null) {
+                    try {
+                        stats.updateStatistic(region);
+                    } finally {
+                        stats.close();
+                    }
                 }
-                innerScanner.close();
             } finally {
-                region.closeRegionOperation();
+                try {
+                    innerScanner.close();
+                } finally {
+                    region.closeRegionOperation();
+                }
             }
         }
         
@@ -458,9 +456,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
         TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
         if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
                 && scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
-            StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
-            internalScan =
-                    stats.createCompactionScanner(c.getEnvironment().getRegion(), store,
scanners, scanType, earliestPutTs, s);
+            try {
+                // TODO: when does this get closed?
+                StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString());
+                internalScan =
+                        stats.createCompactionScanner(c.getEnvironment().getRegion(), store,
scanners, scanType, earliestPutTs, s);
+            } catch (IOException e) {
+                // If we can't reach the stats table, don't interrupt the normal
+                // compaction operation, just log a warning.
+                if(logger.isWarnEnabled()) {
+                    logger.warn("Unable to collect stats for " + table, e);
+                }
+            }
         }
         return internalScan;
     }
@@ -472,15 +479,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
         HRegion region = e.getEnvironment().getRegion();
         TableName table = region.getRegionInfo().getTable();
         if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+            StatisticsCollector stats = null;
             try {
-                StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment()
-                        .getConfiguration());
+                stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString());
                 stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r,
region);
-                stats.clear();
             } catch (IOException ioe) { 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Error while collecting stats during split ",ioe);
+                if(logger.isWarnEnabled()) {
+                    logger.warn("Error while collecting stats during split for " + table,ioe);
                 }
+            } finally {
+                if (stats != null) stats.close();
             }
         }
             

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index a800fd9..7afeb6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -107,9 +107,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final String SYSTEM_CATALOG_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA,
SYSTEM_CATALOG_TABLE);
     public static final byte[] SYSTEM_CATALOG_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES,
SYSTEM_CATALOG_SCHEMA_BYTES);
     public static final String SYSTEM_STATS_TABLE = "STATS";
-    public static final byte[] SYSTEM_STATS_BYTES = Bytes.toBytes(SYSTEM_STATS_TABLE);
     public static final String SYSTEM_STATS_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA,
SYSTEM_STATS_TABLE);
-    public static final byte[] SYSTEM_STATS_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES,
SYSTEM_STATS_BYTES);
+    public static final byte[] SYSTEM_STATS_NAME_BYTES = Bytes.toBytes(SYSTEM_STATS_NAME);
     
     public static final String SYSTEM_CATALOG_ALIAS = "\"SYSTEM.TABLE\"";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 9c520d8..efb4bd6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -103,7 +104,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.schema.EmptySequenceCacheException;
-import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
@@ -590,13 +590,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null,
1, null);
             }
             // TODO: better encapsulation for this
-            // Since indexes can't have indexes, don't install our indexing coprocessor for
indexes. Also,
-            // don't install on the metadata table until we fix the TODO there.
-            if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) &&
!SchemaUtil.isMetaTable(tableName) && !descriptor.hasCoprocessor(Indexer.class.getName()))
{
+            // Since indexes can't have indexes, don't install our indexing coprocessor for
indexes.
+            // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we
use
+            // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
+            if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) 
+                    && !SchemaUtil.isMetaTable(tableName)
+                    && !SchemaUtil.isStatsTable(tableName) 
+                    && !descriptor.hasCoprocessor(Indexer.class.getName())) {
                 Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
                 opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
                 Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts);
             }
+            if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName()))
{
+                descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
+                        null, 1, null);
+            }
             
             if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
                     && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(descriptor
@@ -730,12 +738,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType
, props, families, splits);
             
             if (!tableExist) {
-                /*
-                 * Remove the splitPolicy attribute due to an HBase bug (see below)
-                 */
-                if (isMetaTable) {
-                    newDesc.remove(HTableDescriptor.SPLIT_POLICY);
-                }
                 if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
&& Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES))))
{
                     newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName());
                 }
@@ -752,31 +754,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                 }
                 if (isMetaTable) {
                     checkClientServerCompatibility();
-                    /*
-                     * Now we modify the table to add the split policy, since we know that
the client and
-                     * server and compatible. This works around a nasty, known HBase bug
where if a split
-                     * policy class cannot be found on the server, the HBase table is left
in a horrible
-                     * "ghost" state where it can't be used and can't be deleted without
bouncing the master. 
-                     */
-                    newDesc.setValue(HTableDescriptor.SPLIT_POLICY, MetaDataSplitPolicy.class.getName());
-                    admin.disableTable(tableName);
-                    admin.modifyTable(tableName, newDesc);
-                    admin.enableTable(tableName);
                 }
                 return null;
             } else {
-                if (!modifyExistingMetaData || existingDesc.equals(newDesc)) {
-                    // Table is already created. Note that the presplits are ignored in this
case
-                    if (isMetaTable) {
-                        checkClientServerCompatibility();
-                    }
-                    return existingDesc;
-                }
-
                 if (isMetaTable) {
                     checkClientServerCompatibility();
                 }
                          
+                if (!modifyExistingMetaData || existingDesc.equals(newDesc)) {
+                    return existingDesc;
+                }
+
                 // TODO: Take advantage of online schema change ability by setting "hbase.online.schema.update.enable"
to true
                 admin.disableTable(tableName);
                 admin.modifyTable(tableName, newDesc);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index bbc653e..84bb516 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -226,6 +226,7 @@ public interface QueryConstants {
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY +
"))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS +
",\n" +
+            // Install split policy to prevent a tenant's metadata from being split across
regions.
             HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() +
"'\n";
     
     public static final String CREATE_STATS_TABLE_METADATA = 
@@ -241,7 +242,9 @@ public interface QueryConstants {
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY ("
             + PHYSICAL_NAME + ","
             + COLUMN_FAMILY + ","+ REGION_NAME+"))\n" +
-            HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS +
",\n" +
+            // TODO: should we support versioned stats?
+            // HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS
+ ",\n" +
+            // Install split policy to prevent a physical table's stats from being split
across regions.
             HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() +
"'\n";
             
     public static final String CREATE_SEQUENCE_METADATA =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index 82ae309..bb05a32 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PDataType;
@@ -51,7 +54,11 @@ import com.google.common.collect.Maps;
 
 /**
  * A default implementation of the Statistics tracker that helps to collect stats like min
key, max key and
- * guideposts
+ * guideposts.
+ * TODO: review timestamps used for stats. We support the user controlling the timestamps,
so we should
+ * honor that with timestamps for stats as well. The issue is for compaction, though. I don't
know of
+ * a way for the user to specify any timestamp for that. Perhaps best to use current time
across the
+ * board for now.
  */
 public class StatisticsCollector {
 
@@ -64,13 +71,19 @@ public class StatisticsCollector {
     // Ensures that either analyze or compaction happens at any point of time.
     private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
 
-    public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException
{
+    public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName) throws
IOException {
+        guidepostDepth =
+            env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
+                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
         // Get the stats table associated with the current table on which the CP is
         // triggered
-        this.statsTable = statsTable;
-        guidepostDepth =
-                conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
-                    QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
+        this.statsTable = StatisticsTable.getStatisticsTable(
+                env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES)));
+        this.statsTable.commitLastStatsUpdatedTime(tableName, TimeKeeper.SYSTEM.getCurrentTime());
+    }
+    
+    public void close() throws IOException {
+        this.statsTable.close();
     }
 
     public void updateStatistic(HRegion region) {
@@ -119,10 +132,11 @@ public class StatisticsCollector {
 
     private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations,
long currentTime) throws IOException {
         try {
+            String tableName = region.getRegionInfo().getTable().getNameAsString();
+            String regionName = region.getRegionInfo().getRegionNameAsString();
             // update the statistics table
             for (ImmutableBytesPtr fam : familyMap.keySet()) {
-                String tableName = region.getRegionInfo().getTable().getNameAsString();
-                statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()),
this,
+                statsTable.deleteStats(tableName, regionName, this,
                         Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
             }
         } catch (IOException e) {
@@ -147,7 +161,7 @@ public class StatisticsCollector {
     }
 
     /**
-     * Update the current statistics based on the lastest batch of key-values from the underlying
scanner
+     * Update the current statistics based on the latest batch of key-values from the underlying
scanner
      * 
      * @param results
      *            next batch of {@link KeyValue}s
@@ -188,24 +202,19 @@ public class StatisticsCollector {
     public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r,
             HRegion region) {
         try {
-            if (familyMap != null) {
-                familyMap.clear();
-            }
             // Create a delete operation on the parent region
             // Then write the new guide posts for individual regions
-            // TODO : Try making this atomic
             List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
             long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+            deleteStatsFromStatsTable(region, mutations, currentTime);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
             }
-            collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime);
-            clear();
+            collectStatsForSplitRegions(conf, l, mutations, currentTime);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
             }
-            collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime);
-            clear();
+            collectStatsForSplitRegions(conf, r, mutations, currentTime);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Committing stats for the daughter regions as part of split " +
r.getRegionInfo());
             }
@@ -216,32 +225,29 @@ public class StatisticsCollector {
         }
     }
 
-    private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, HRegion
parent, boolean delete,
+    private void collectStatsForSplitRegions(Configuration conf, HRegion daughter,
             List<Mutation> mutations, long currentTime) throws IOException {
+        IOException toThrow = null;
+        clear();
         Scan scan = createScan(conf);
         RegionScanner scanner = null;
         int count = 0;
         try {
             scanner = daughter.getScanner(scan);
             count = scanRegion(scanner, count);
+            writeStatsToStatsTable(daughter, false, mutations, currentTime);
         } catch (IOException e) {
             LOG.error(e);
-            throw e;
+            toThrow = e;
         } finally {
-            if (scanner != null) {
                 try {
-                    if (delete) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Deleting the stats for the parent region " + parent.getRegionInfo());
-                        }
-                        deleteStatsFromStatsTable(parent, mutations, currentTime);
-                    }
-                    writeStatsToStatsTable(daughter, false, mutations, currentTime);
+                    if (scanner != null) scanner.close();
                 } catch (IOException e) {
                     LOG.error(e);
-                    throw e;
+                    if (toThrow != null) toThrow = e;
+                } finally {
+                    if (toThrow != null) throw toThrow;
                 }
-            }
         }
     }
 
@@ -256,7 +262,7 @@ public class StatisticsCollector {
 
     protected InternalScanner getInternalScanner(HRegion region, Store store,
             InternalScanner internalScan, String family) {
-        return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan,
+        return new StatisticsScanner(this, statsTable, region, internalScan,
                 Bytes.toBytes(family));
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
index 86ffca7..79f64fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
@@ -15,10 +15,10 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.util.SchemaUtil;
@@ -31,11 +31,11 @@ public class StatisticsScanner implements InternalScanner {
     private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
     private InternalScanner delegate;
     private StatisticsTable stats;
-    private HRegionInfo region;
+    private HRegion region;
     private StatisticsCollector tracker;
     private byte[] family;
 
-    public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo
region,
+    public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegion
region,
             InternalScanner delegate, byte[] family) {
         // should there be only one tracker?
         this.tracker = tracker;
@@ -75,12 +75,13 @@ public class StatisticsScanner implements InternalScanner {
         }
     }
 
+    @Override
     public void close() throws IOException {
         IOException toThrow = null;
         try {
             // update the statistics table
             // Just verify if this if fine
-            String tableName = SchemaUtil.getTableNameFromFullName(region.getTable().getNameAsString());
+            String tableName = SchemaUtil.getTableNameFromFullName(region.getRegionInfo().getTable().getNameAsString());
             ArrayList<Mutation> mutations = new ArrayList<Mutation>();
             long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
             if (LOG.isDebugEnabled()) {
@@ -103,12 +104,25 @@ public class StatisticsScanner implements InternalScanner {
         } catch (IOException e) {
             LOG.error("Failed to update statistics table!", e);
             toThrow = e;
-        }
-        // close the delegate scanner
-        try {
-            delegate.close();
-        } catch (IOException e) {
-            LOG.error("Error while closing the scanner");
+        } finally {
+            try {
+                stats.close();
+            } catch (IOException e) {
+                if (toThrow == null) toThrow = e;
+                LOG.error("Error while closing the stats table", e);
+            } finally {
+                // close the delegate scanner
+                try {
+                    delegate.close();
+                } catch (IOException e) {
+                    if (toThrow == null) toThrow = e;
+                    LOG.error("Error while closing the scanner", e);
+                } finally {
+                    if (toThrow != null) {
+                        throw toThrow;
+                    }
+                }
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index bc769e3..ebaa978 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -20,28 +20,30 @@ package org.apache.phoenix.schema.stat;
 import java.io.Closeable;
 import java.io.IOException;
 import java.sql.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import com.google.protobuf.ServiceException;
 
 /**
  * Wrapper to access the statistics table SYSTEM.STATS using the HTable.
  */
-@SuppressWarnings("deprecation")
 public class StatisticsTable implements Closeable {
-    /** Map of the currently open statistics tables */
-    private static final Map<String, StatisticsTable> tableMap = new HashMap<String,
StatisticsTable>();
     /**
      * @param Configuration
      *            Configruation to update the stats table.
@@ -51,20 +53,8 @@ public class StatisticsTable implements Closeable {
      * @throws IOException
      *             if the table cannot be created due to an underlying HTable creation error
      */
-    public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration
conf,
-            String primaryTableName) throws IOException {
-        StatisticsTable table = tableMap.get(primaryTableName);
-        if (table == null) {
-            // Map the statics table and the table with which the statistics is
-            // associated. This is a workaround
-            HTablePool pool = new HTablePool(conf,100);
-            //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
-            HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
-            //h.setAutoFlushTo(true);
-            table = new StatisticsTable(hTable);
-            tableMap.put(primaryTableName, table);
-        }
-        return table;
+    public static StatisticsTable getStatisticsTable(HTableInterface hTable) throws IOException
{
+        return new StatisticsTable(hTable);
     }
 
     private final HTableInterface statisticsTable;
@@ -102,21 +92,39 @@ public class StatisticsTable implements Closeable {
         if (tracker == null) { return; }
 
         // Add the timestamp header
-        formLastUpdatedStatsMutation(tableName, currentTime, mutations);
+        commitLastStatsUpdatedTime(tableName, currentTime);
 
         byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
                 PDataType.VARCHAR.toBytes(regionName));
         formStatsUpdateMutation(tracker, fam, mutations, currentTime, prefix);
     }
 
+    private static MutationType getMutationType(Mutation m) throws IOException {
+        if (m instanceof Put) {
+            return MutationType.PUT;
+        } else if (m instanceof Delete) {
+            return MutationType.DELETE;
+        } else {
+            throw new DoNotRetryIOException("Unsupported mutation type in stats commit"
+                    + m.getClass().getName());
+        }
+    }
     public void commitStats(List<Mutation> mutations) throws IOException {
-        Object[] res = new Object[mutations.size()];
-        try {
-            if (mutations.size() > 0) {
-                statisticsTable.batch(mutations, res);
+        if (mutations.size() > 0) {
+            byte[] row = mutations.get(0).getRow();
+            MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+            for (Mutation m : mutations) {
+                mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m),
m));
+            }
+            MutateRowsRequest mrm = mrmBuilder.build();
+            CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row);
+            MultiRowMutationService.BlockingInterface service =
+                    MultiRowMutationService.newBlockingStub(channel);
+            try {
+              service.mutateRows(null, mrm);
+            } catch (ServiceException ex) {
+              ProtobufUtil.toIOException(ex);
             }
-        } catch (InterruptedException e) {
-            throw new IOException("Exception while adding deletes and puts");
         }
     }
 
@@ -137,12 +145,20 @@ public class StatisticsTable implements Closeable {
         mutations.add(put);
     }
 
-    private void formLastUpdatedStatsMutation(String tableName, long currentTime, List<Mutation>
mutations) throws IOException {
-        byte[] prefix = StatisticsUtils.getRowKeyForTSUpdate(PDataType.VARCHAR.toBytes(tableName));
+    public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException {
+        // always starts with the source table
+        TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length);
+        os.write(table);
+        os.close();
+        return os.getBuffer();
+    }
+
+    public void commitLastStatsUpdatedTime(String tableName, long currentTime) throws IOException
{
+        byte[] prefix = PDataType.VARCHAR.toBytes(tableName);
         Put put = new Put(prefix);
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES,
currentTime,
                 PDataType.DATE.toBytes(new Date(currentTime)));
-        mutations.add(put);
+        statisticsTable.put(put);
     }
     
     public void deleteStats(String tableName, String regionName, StatisticsCollector tracker,
String fam,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
index 7cb3a38..8b6d7fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
@@ -48,14 +48,6 @@ public class StatisticsUtils {
         return os.getBuffer();
     }
     
-    public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException {
-        // always starts with the source table
-        TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length);
-        os.write(table);
-        os.close();
-        return os.getBuffer();
-    }
-
     public static byte[] getCFFromRowKey(byte[] table, byte[] row, int rowOffset, int rowLength)
{
         // Move over the the sepeartor byte that would be written after the table name
         int startOff = Bytes.indexOf(row, table) + (table.length) + 1;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8621b7c9/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c0ee92b..5cc861b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.util;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -344,6 +345,10 @@ public class SchemaUtil {
         return Bytes.compareTo(tableName, SYSTEM_CATALOG_NAME_BYTES) == 0;
     }
     
+    public static boolean isStatsTable(byte[] tableName) {
+        return Bytes.compareTo(tableName, SYSTEM_STATS_NAME_BYTES) == 0;
+    }
+    
     public static boolean isSequenceTable(byte[] tableName) {
         return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES)
== 0;
     }


Mime
View raw message