hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1201994 [1/2] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apache/hadoop/hbase/ test/java/org/apach...
Date Tue, 15 Nov 2011 01:26:06 GMT
Author: nspiegelberg
Date: Tue Nov 15 01:26:05 2011
New Revision: 1201994

URL: http://svn.apache.org/viewvc?rev=1201994&view=rev
Log:
HBASE-4768 Per-(table, columnFamily) metrics with configurable table name inclusion

Summary: This is an initial version of an HBase trunk diff for per-table/CF
metrics (see JIRA for details). Unit tests mostly pass -- need to look into
TestDistributedLogSplitting. Also still doing cluster testing.

Test Plan: Unit tests, single-node cluster, dev cluster. Need to try bulk-load
map-reduce jobs as well. Observe metrics through JMX.

Reviewers: jgray, nspiegelberg, stack, tedyu, todd, JIRA

Reviewed By: nspiegelberg

CC: nspiegelberg, Liyin, mbautin, tedyu

Differential Revision: 363

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaConfigured.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java
Removed:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockInfo.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Tue Nov 15 01:26:05 2011
@@ -23,21 +23,19 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.io.RawComparator;
 
 /**
  * Common functionality needed by all versions of {@link HFile} readers.
  */
-public abstract class AbstractHFileReader implements HFile.Reader {
-
-  private static final Log LOG = LogFactory.getLog(AbstractHFileReader.class);
+public abstract class AbstractHFileReader extends SchemaConfigured
+    implements HFile.Reader {
 
   /** Filesystem-level block reader for this HFile format version. */
   protected HFileBlock.FSReader fsBlockReader;
@@ -92,26 +90,11 @@ public abstract class AbstractHFileReade
 
   protected FileInfo fileInfo;
 
-  /** Prefix of the form cf.<column_family_name> for statistics counters. */
-  private final String cfStatsPrefix;
-
-  // various metrics that we want to track on a per-cf basis
-  public String fsReadTimeNanoMetric = "";
-  public String compactionReadTimeNanoMetric = "";
-
-  public String fsBlockReadCntMetric = "";
-  public String compactionBlockReadCntMetric = "";
-
-  public String fsBlockReadCacheHitCntMetric = "";
-  public String compactionBlockReadCacheHitCntMetric = "";
-
-  public String fsMetaBlockReadCntMetric = "";
-  public String fsMetaBlockReadCacheHitCntMetric = "";
-
   protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long fileSize,
       final boolean closeIStream,
       final CacheConfig cacheConf) {
+    super(null, path);
     this.trailer = trailer;
     this.compressAlgo = trailer.getCompressionCodec();
     this.cacheConf = cacheConf;
@@ -120,21 +103,6 @@ public abstract class AbstractHFileReade
     this.closeIStream = closeIStream;
     this.path = path;
     this.name = path.getName();
-    cfStatsPrefix = "cf." + parseCfNameFromPath(path.toString());
-
-    fsReadTimeNanoMetric = cfStatsPrefix + ".fsReadNano";
-    compactionReadTimeNanoMetric = cfStatsPrefix + ".compactionReadNano";
-
-    fsBlockReadCntMetric = cfStatsPrefix + ".fsBlockReadCnt";
-    fsBlockReadCacheHitCntMetric = cfStatsPrefix + ".fsBlockReadCacheHitCnt";
-
-    compactionBlockReadCntMetric = cfStatsPrefix + ".compactionBlockReadCnt";
-    compactionBlockReadCacheHitCntMetric = cfStatsPrefix
-        + ".compactionBlockReadCacheHitCnt";
-
-    fsMetaBlockReadCntMetric = cfStatsPrefix + ".fsMetaBlockReadCnt";
-    fsMetaBlockReadCacheHitCntMetric = cfStatsPrefix
-        + ".fsMetaBlockReadCacheHitCnt";
   }
 
   @SuppressWarnings("serial")
@@ -154,24 +122,6 @@ public abstract class AbstractHFileReade
     return KeyValue.keyToString(getLastKey());
   }
 
-  /**
-   * Parse the HFile path to figure out which table and column family
-   * it belongs to. This is used to maintain read statistics on a
-   * per-column-family basis.
-   *
-   * @param path HFile path name
-   */
-  public static String parseCfNameFromPath(String path) {
-    String splits[] = path.split("/");
-    if (splits.length < 2) {
-      LOG.warn("Could not determine the table and column family of the " +
-          "HFile path " + path);
-      return "unknown";
-    }
-
-    return splits[splits.length - 2];
-  }
-
   public abstract boolean isFileInfoLoaded();
 
   @Override
@@ -291,11 +241,6 @@ public abstract class AbstractHFileReade
   }
 
   @Override
-  public String getColumnFamilyName() {
-    return cfStatsPrefix;
-  }
-
-  @Override
   public FixedFileTrailer getTrailer() {
     return trailer;
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Tue Nov 15 01:26:05 2011
@@ -26,8 +26,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
@@ -42,9 +41,8 @@ import org.apache.hadoop.io.Writable;
 /**
  * Common functionality needed by all versions of {@link HFile} writers.
  */
-public abstract class AbstractHFileWriter implements HFile.Writer {
-
-  private static final Log LOG = LogFactory.getLog(AbstractHFileWriter.class);
+public abstract class AbstractHFileWriter extends SchemaConfigured
+    implements HFile.Writer {
 
   /** Key previously appended. Becomes the last key in the file. */
   protected byte[] lastKeyBuffer = null;
@@ -94,10 +92,6 @@ public abstract class AbstractHFileWrite
   /** May be null if we were passed a stream. */
   protected final Path path;
 
-  /** Prefix of the form cf.<column_family_name> for statistics counters. */
-  // Note that this is gotten from the path, which can be null, so this can
-  // remain unknown
-  public String cfStatsPrefix = "cf.unknown";
 
   /** Cache configuration for caching data on write. */
   protected final CacheConfig cacheConf;
@@ -111,6 +105,7 @@ public abstract class AbstractHFileWrite
   public AbstractHFileWriter(CacheConfig cacheConf,
       FSDataOutputStream outputStream, Path path, int blockSize,
       Compression.Algorithm compressAlgo, KeyComparator comparator) {
+    super(null, path);
     this.outputStream = outputStream;
     this.path = path;
     this.name = path != null ? path.getName() : outputStream.toString();
@@ -122,27 +117,6 @@ public abstract class AbstractHFileWrite
 
     closeOutputStream = path != null;
     this.cacheConf = cacheConf;
-
-    if (path != null)
-      cfStatsPrefix = "cf." + parseCfNameFromPath(path.toString());
-  }
-
-  /**
-   * Parse the HFile path to figure out which table and column family it belongs
-   * to. This is used to maintain read statistics on a per-column-family basis.
-   *
-   * @param path
-   *          HFile path name
-   */
-  public static String parseCfNameFromPath(String path) {
-    String splits[] = path.split("/");
-    if (splits.length < 2) {
-      LOG.warn("Could not determine the table and column family of the "
-          + "HFile path " + path);
-      return "unknown";
-    }
-
-    return splits[splits.length - 2];
   }
 
   /**
@@ -250,11 +224,6 @@ public abstract class AbstractHFileWrite
   }
 
   @Override
-  public String getColumnFamilyName() {
-    return cfStatsPrefix;
-  }
-
-  @Override
   public String toString() {
     return "writer=" + (path != null ? path.toString() : null) + ", name="
         + name + ", compression=" + compressAlgo.getName();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java Tue Nov 15 01:26:05 2011
@@ -78,7 +78,18 @@ public enum BlockType {
   INDEX_V1("IDXBLK)+", BlockCategory.INDEX);
 
   public enum BlockCategory {
-    DATA, META, INDEX, BLOOM
+    DATA, META, INDEX, BLOOM, ALL_CATEGORIES, UNKNOWN;
+
+    /**
+     * Throws an exception if the block category passed is the special category
+     * meaning "all categories".
+     */
+    public void expectSpecific() {
+      if (this == ALL_CATEGORIES) {
+        throw new IllegalArgumentException("Expected a specific block " +
+            "category but got " + this);
+      }
+    }
   }
 
   public static final int MAGIC_LENGTH = 8;
@@ -104,8 +115,8 @@ public enum BlockType {
     buf.put(magic);
   }
 
-  public String getMetricName(){
-    return metricCat.toString();
+  public BlockCategory getCategory() {
+    return metricCat;
   }
 
   public static BlockType parse(byte[] buf, int offset, int length)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Tue Nov 15 01:26:05 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile
 
 import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 
 /**
  * Cacheable is an interface that allows for an object to be cached. If using an
@@ -53,4 +54,15 @@ public interface Cacheable extends HeapS
    * @return CacheableDeserialzer instance.
    */
   public CacheableDeserializer<Cacheable> getDeserializer();
+
+  /**
+   * @return the block type of this cached HFile block
+   */
+  public BlockType getBlockType();
+
+  /**
+   * @return the metrics object identified by table and column family
+   */
+  public SchemaMetrics getSchemaMetrics();
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Nov 15 01:26:05 2011
@@ -41,6 +41,9 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -142,6 +145,14 @@ public class HFile {
   /** Separator between HFile name and offset in block cache key */
   static final char CACHE_KEY_SEPARATOR = '_';
 
+  /**
+   * We assume that HFile path ends with
+   * ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
+   * many levels of nesting. This is needed for identifying table and CF name
+   * from an HFile path.
+   */
+  public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
+
   // For measuring latency of "typical" reads and writes
   static volatile AtomicInteger readOps = new AtomicInteger();
   static volatile AtomicLong readTimeNano = new AtomicLong();
@@ -258,6 +269,7 @@ public class HFile {
    */
   public static final WriterFactory getWriterFactory(Configuration conf,
       CacheConfig cacheConf) {
+    SchemaMetrics.configureGlobally(conf);
     int version = getFormatVersion(conf);
     switch (version) {
     case 1:
@@ -278,7 +290,8 @@ public class HFile {
   }
 
   /** An interface used by clients to open and iterate an {@link HFile}. */
-  public interface Reader extends Closeable, CachingBlockReader {
+  public interface Reader extends Closeable, CachingBlockReader,
+      SchemaAware {
     /**
      * Returns this reader's "name". Usually the last component of the path.
      * Needs to be constant as the file is being moved to support caching on
@@ -291,10 +304,10 @@ public class HFile {
     RawComparator<byte []> getComparator();
 
     HFileScanner getScanner(boolean cacheBlocks,
-        final boolean pread, final boolean isCompaction);
+       final boolean pread, final boolean isCompaction);
 
     ByteBuffer getMetaBlock(String metaBlockName,
-        boolean cacheBlock) throws IOException;
+       boolean cacheBlock) throws IOException;
 
     Map<byte[], byte[]> loadFileInfo() throws IOException;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Tue Nov 15 01:26:05 2011
@@ -29,12 +29,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 
 import org.apache.hadoop.hbase.io.DoubleOutputStream;
-import org.apache.hadoop.hbase.io.hfile.HFileBlockInfo;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CompoundBloomFilter;
@@ -74,7 +76,7 @@ import static org.apache.hadoop.hbase.io
  * The version 2 block representation in the block cache is the same as above,
  * except that the data section is always uncompressed in the cache.
  */
-public class HFileBlock implements Cacheable, HFileBlockInfo {
+public class HFileBlock extends SchemaConfigured implements Cacheable {
 
   /** The size of a version 2 {@link HFile} block header */
   public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
@@ -86,26 +88,27 @@ public class HFileBlock implements Cache
   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
 
-  static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
-
+  static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
+      Bytes.SIZEOF_INT;
 
   private static final CacheableDeserializer<Cacheable> blockDeserializer =
-  new CacheableDeserializer<Cacheable>() {
-    public HFileBlock deserialize(ByteBuffer buf) throws IOException{
-      ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
-          - HFileBlock.EXTRA_SERIALIZATION_SPACE);
-      buf.limit(buf.limit()
-          - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
-      newByteBuffer.put(buf);
-      HFileBlock ourBuffer = new HFileBlock(newByteBuffer);
-
-      buf.position(buf.limit());
-      buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
-      ourBuffer.offset = buf.getLong();
-      ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
-      return ourBuffer;
-    }
-  };
+      new CacheableDeserializer<Cacheable>() {
+        public HFileBlock deserialize(ByteBuffer buf) throws IOException{
+          ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
+              - HFileBlock.EXTRA_SERIALIZATION_SPACE);
+          buf.limit(buf.limit()
+              - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+          newByteBuffer.put(buf);
+          HFileBlock ourBuffer = new HFileBlock(newByteBuffer);
+
+          buf.position(buf.limit());
+          buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+          ourBuffer.offset = buf.getLong();
+          ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
+          return ourBuffer;
+        }
+      };
+
   private BlockType blockType;
   private final int onDiskSizeWithoutHeader;
   private final int uncompressedSizeWithoutHeader;
@@ -157,16 +160,6 @@ public class HFileBlock implements Cache
     this.offset = offset;
   }
 
-  private String cfStatsPrefix = "cf.unknown";
-
-  public String getColumnFamilyName() {
-    return this.cfStatsPrefix;
-  }
-
-  public void setColumnFamilyName(String cfName) {
-    this.cfStatsPrefix = cfName;
-  }
-
   /**
    * Creates a block from an existing buffer starting with a header. Rewinds
    * and takes ownership of the buffer. By definition of rewind, ignores the
@@ -423,23 +416,24 @@ public class HFileBlock implements Cache
 
   @Override
   public long heapSize() {
-    // This object, block type and byte buffer reference, on-disk and
-    // uncompressed size, next block's on-disk size, offset and previous
-    // offset, byte buffer object, and its byte array. Might also need to add
-    // some fields inside the byte buffer.
-
-    // We only add one BYTE_BUFFER_HEAP_SIZE because at any given moment, one of
-    // the bytebuffers will be null. But we do account for both references.
+    long size = ClassSize.align(
+        // This object
+        ClassSize.OBJECT +
+        // Block type and byte buffer references
+        2 * ClassSize.REFERENCE +
+        // On-disk size, uncompressed size, and next block's on-disk size
+        3 * Bytes.SIZEOF_INT +
+        // This and previous block offset
+        2 * Bytes.SIZEOF_LONG
+    );
 
-    // If we are on heap, then we add the capacity of buf.
     if (buf != null) {
-      return ClassSize.align(ClassSize.OBJECT + 3 * ClassSize.REFERENCE + 3
-          * Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE)
-          + ClassSize.align(buf.capacity());
-    } else {
-      return ClassSize.align(ClassSize.OBJECT + 3 * ClassSize.REFERENCE + 3
-          * Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE);
+      size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
     }
+
+    // SchemaConfigured (but don't count object overhead twice).
+    size += super.heapSize() - ClassSize.OBJECT;
+    return size;
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Tue Nov 15 01:26:05 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
@@ -83,13 +84,6 @@ public class HFileBlockIndex {
       "Inline blocks are not allowed in the single-level-only mode";
 
   /**
-   * Configuration key to cache leaf- and intermediate-level index blocks on
-   * write.
-   */
-  public static final String CACHE_INDEX_BLOCKS_ON_WRITE_KEY =
-      "hfile.block.index.cacheonwrite";
-
-  /**
    * The size of a meta-data record used for finding the mid-key in a
    * multi-level index. Consists of the middle leaf-level index block offset
    * (long), its on-disk size without header included (int), and the mid-key
@@ -609,7 +603,8 @@ public class HFileBlockIndex {
    * index. However, in most practical cases we will only have leaf-level
    * blocks and the root index, or just the root index.
    */
-  public static class BlockIndexWriter implements InlineBlockWriter {
+  public static class BlockIndexWriter extends SchemaConfigured
+      implements InlineBlockWriter {
     /**
      * While the index is being written, this represents the current block
      * index referencing all leaf blocks, with one exception. If the file is
@@ -838,8 +833,10 @@ public class HFileBlockIndex {
       blockWriter.writeHeaderAndData(out);
 
       if (blockCache != null) {
+        HFileBlock blockForCaching = blockWriter.getBlockForCaching();
+        passSchemaMetricsTo(blockForCaching);
         blockCache.cacheBlock(HFile.getBlockCacheKey(nameForCaching,
-            beginOffset), blockWriter.getBlockForCaching());
+            beginOffset), blockForCaching);
       }
 
       // Add intermediate index block size
@@ -1302,14 +1299,6 @@ public class HFileBlockIndex {
 
   }
 
-  /**
-   * @return true if the given configuration specifies that we should
-   *         cache-on-write index blocks
-   */
-  public static boolean shouldCacheOnWrite(Configuration conf) {
-    return conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
-  }
-
   public static int getMaxChunkSize(Configuration conf) {
     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Tue Nov 15 01:26:05 2011
@@ -30,10 +30,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
@@ -216,7 +216,7 @@ public class HFileReaderV1 extends Abstr
     // Per meta key from any given file, synchronize reads for said block
     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
       metaLoads.incrementAndGet();
-      HRegion.incrNumericMetric(this.fsMetaBlockReadCntMetric, 1);
+
       // Check cache for block.  If found return.
       if (cacheConf.isBlockCacheEnabled()) {
         HFileBlock cachedBlock =
@@ -224,7 +224,7 @@ public class HFileReaderV1 extends Abstr
               cacheConf.shouldCacheDataOnRead());
         if (cachedBlock != null) {
           cacheHits.incrementAndGet();
-          HRegion.incrNumericMetric(this.fsMetaBlockReadCacheHitCntMetric, 1);
+          getSchemaMetrics().updateOnCacheHit(BlockCategory.META, false);
           return cachedBlock.getBufferWithoutHeader();
         }
         // Cache Miss, please load.
@@ -233,13 +233,13 @@ public class HFileReaderV1 extends Abstr
       HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
           nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
           true);
-      hfileBlock.setColumnFamilyName(this.getColumnFamilyName());
+      passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.META);
 
       long delta = System.nanoTime() - startTimeNs;
-      HRegion.incrTimeVaryingMetric(fsReadTimeNanoMetric, delta);
       HFile.readTimeNano.addAndGet(delta);
       HFile.readOps.incrementAndGet();
+      getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
 
       // Cache the block
       if (cacheConf.shouldCacheDataOnRead() && cacheBlock) {
@@ -281,12 +281,6 @@ public class HFileReaderV1 extends Abstr
     synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
       blockLoads.incrementAndGet();
 
-      if (isCompaction) {
-        HRegion.incrNumericMetric(this.compactionBlockReadCntMetric, 1);
-      } else {
-        HRegion.incrNumericMetric(this.fsBlockReadCntMetric, 1);
-      }
-
       // Check cache for block.  If found return.
       if (cacheConf.isBlockCacheEnabled()) {
         HFileBlock cachedBlock =
@@ -294,15 +288,8 @@ public class HFileReaderV1 extends Abstr
               cacheConf.shouldCacheDataOnRead());
         if (cachedBlock != null) {
           cacheHits.incrementAndGet();
-
-          if (isCompaction) {
-            HRegion.incrNumericMetric(
-                this.compactionBlockReadCacheHitCntMetric, 1);
-          } else {
-            HRegion.incrNumericMetric(
-                this.fsBlockReadCacheHitCntMetric, 1);
-          }
-
+          getSchemaMetrics().updateOnCacheHit(BlockCategory.DATA,
+              isCompaction);
           return cachedBlock.getBufferWithoutHeader();
         }
         // Carry on, please load.
@@ -324,18 +311,15 @@ public class HFileReaderV1 extends Abstr
 
       HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
           - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
-      hfileBlock.setColumnFamilyName(this.getColumnFamilyName());
+      passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.DATA);
       ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
 
       long delta = System.nanoTime() - startTimeNs;
       HFile.readTimeNano.addAndGet(delta);
       HFile.readOps.incrementAndGet();
-      if (isCompaction) {
-        HRegion.incrTimeVaryingMetric(this.compactionReadTimeNanoMetric, delta);
-      } else {
-        HRegion.incrTimeVaryingMetric(this.fsReadTimeNanoMetric, delta);
-      }
+      getSchemaMetrics().updateOnCacheMiss(BlockCategory.DATA, isCompaction,
+          delta);
 
       // Cache the block
       if (cacheConf.shouldCacheDataOnRead() && cacheBlock) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Tue Nov 15 01:26:05 2011
@@ -30,8 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.IdLock;
 
@@ -170,7 +170,6 @@ public class HFileReaderV2 extends Abstr
     // single-level.
     synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
       metaLoads.incrementAndGet();
-      HRegion.incrNumericMetric(fsMetaBlockReadCntMetric, 1);
 
       // Check cache for block. If found return.
       long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
@@ -184,7 +183,7 @@ public class HFileReaderV2 extends Abstr
           // Return a distinct 'shallow copy' of the block,
           // so pos does not get messed by the scanner
           cacheHits.incrementAndGet();
-          HRegion.incrNumericMetric(fsMetaBlockReadCacheHitCntMetric, 1);
+          getSchemaMetrics().updateOnCacheHit(BlockCategory.META, false);
           return cachedBlock.getBufferWithoutHeader();
         }
         // Cache Miss, please load.
@@ -192,12 +191,12 @@ public class HFileReaderV2 extends Abstr
 
       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
           blockSize, -1, true);
-      metaBlock.setColumnFamilyName(this.getColumnFamilyName());
+      passSchemaMetricsTo(metaBlock);
 
       long delta = System.nanoTime() - startTimeNs;
-      HRegion.incrTimeVaryingMetric(fsReadTimeNanoMetric, delta);
       HFile.readTimeNano.addAndGet(delta);
       HFile.readOps.incrementAndGet();
+      getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
 
       // Cache the block
       if (cacheBlock) {
@@ -251,14 +250,11 @@ public class HFileReaderV2 extends Abstr
         HFileBlock cachedBlock =
           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
         if (cachedBlock != null) {
+          BlockCategory blockCategory =
+              cachedBlock.getBlockType().getCategory();
           cacheHits.incrementAndGet();
 
-          if (isCompaction) {
-            HRegion.incrNumericMetric(
-                this.compactionBlockReadCacheHitCntMetric, 1);
-          } else {
-            HRegion.incrNumericMetric(this.fsBlockReadCacheHitCntMetric, 1);
-          }
+          getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
 
           if (cachedBlock.getBlockType() == BlockType.DATA)
             HFile.dataBlockReadCnt.incrementAndGet();
@@ -271,16 +267,13 @@ public class HFileReaderV2 extends Abstr
       long startTimeNs = System.nanoTime();
       HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset,
           onDiskBlockSize, -1, pread);
-      dataBlock.setColumnFamilyName(this.getColumnFamilyName());
+      passSchemaMetricsTo(dataBlock);
+      BlockCategory blockCategory = dataBlock.getBlockType().getCategory();
 
       long delta = System.nanoTime() - startTimeNs;
       HFile.readTimeNano.addAndGet(delta);
       HFile.readOps.incrementAndGet();
-      if (isCompaction) {
-        HRegion.incrTimeVaryingMetric(this.compactionReadTimeNanoMetric, delta);
-      } else {
-        HRegion.incrTimeVaryingMetric(this.fsReadTimeNanoMetric, delta);
-      }
+      getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
 
       // Cache the block
       if (cacheBlock) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Tue Nov 15 01:26:05 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -146,6 +147,7 @@ public class HFileWriterV1 extends Abstr
       final KeyComparator comparator) throws IOException {
     super(cacheConf, createOutputStream(conf, fs, path), path,
         blockSize, compress, comparator);
+    SchemaMetrics.configureGlobally(conf);
   }
 
   /** Constructor that takes a stream. */
@@ -204,7 +206,7 @@ public class HFileWriterV1 extends Abstr
       HFileBlock cBlock = new HFileBlock(BlockType.DATA,
           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
           ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
-      cBlock.setColumnFamilyName(this.getColumnFamilyName());
+      passSchemaMetricsTo(cBlock);
       cacheConf.getBlockCache().cacheBlock(
           HFile.getBlockCacheKey(name, blockBegin), cBlock);
       baosDos.close();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Tue Nov 15 01:26:05 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -136,6 +137,7 @@ public class HFileWriterV2 extends Abstr
       final KeyComparator comparator) throws IOException {
     super(cacheConf, createOutputStream(conf, fs, path), path,
         blockSize, compressAlgo, comparator);
+    SchemaMetrics.configureGlobally(conf);
     finishInit(conf);
   }
 
@@ -177,8 +179,17 @@ public class HFileWriterV2 extends Abstr
 
     // Meta data block index writer
     metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
-
     LOG.debug("Initialized with " + cacheConf);
+
+    if (isSchemaConfigured()) {
+      schemaConfigurationChanged();
+    }
+  }
+
+  @Override
+  protected void schemaConfigurationChanged() {
+    passSchemaMetricsTo(dataBlockIndexWriter);
+    passSchemaMetricsTo(metaBlockIndexWriter);
   }
 
   /**
@@ -221,7 +232,7 @@ public class HFileWriterV2 extends Abstr
 
     if (cacheConf.shouldCacheDataOnWrite()) {
       HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching();
-      blockForCaching.setColumnFamilyName(this.getColumnFamilyName());
+      passSchemaMetricsTo(blockForCaching);
       cacheConf.getBlockCache().cacheBlock(
           HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching);
     }
@@ -243,7 +254,7 @@ public class HFileWriterV2 extends Abstr
         if (cacheThisBlock) {
           // Cache this block on write.
           HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
-          cBlock.setColumnFamilyName(this.getColumnFamilyName());
+          passSchemaMetricsTo(cBlock);
           cacheConf.getBlockCache().cacheBlock(
               HFile.getBlockCacheKey(name, offset), cBlock);
         }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Tue Nov 15 01:26:05 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -255,9 +255,7 @@ public class LruBlockCache implements Bl
    * Cache the block with the specified name and buffer.
    * <p>
    * It is assumed this will NEVER be called on an already cached block.  If
-   * that is done, it is assumed that you are reinserting the same exact
-   * block due to a race condition and will update the buffer but not modify
-   * the size of the cache.
+   * that is done, an exception will be thrown.
    * @param blockName block name
    * @param buf block buffer
    * @param inMemory if block is in-memory
@@ -303,13 +301,11 @@ public class LruBlockCache implements Bl
     if (evict) {
       heapsize *= -1;
     }
-    if (cb.getBuffer() instanceof HFileBlockInfo) {
-      HFileBlockInfo cb_hfbi = (HFileBlockInfo) cb.getBuffer();
-      HRegion.incrNumericPersistentMetric(cb_hfbi.getColumnFamilyName()
-          + ".blockCacheSize", heapsize);
-      HRegion.incrNumericPersistentMetric("bt."
-          + cb_hfbi.getBlockType().getMetricName() + ".blockCacheSize",
-          heapsize);
+    Cacheable cachedBlock = cb.getBuffer();
+    SchemaMetrics schemaMetrics = cachedBlock.getSchemaMetrics();
+    if (schemaMetrics != null) {
+      schemaMetrics.updateOnCachePutOrEvict(
+          cachedBlock.getBlockType().getCategory(), heapsize, evict);
     }
     return size.addAndGet(heapsize);
   }
@@ -317,8 +313,10 @@ public class LruBlockCache implements Bl
   /**
    * Get the buffer of the block with the specified name.
    * @param blockName block name
+   * @param caching true if the caller caches blocks on cache misses
    * @return buffer of specified block name, or null if not in cache
    */
+  @Override
   public Cacheable getBlock(String blockName, boolean caching) {
     CachedBlock cb = map.get(blockName);
     if(cb == null) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Nov 15 01:26:05 2011
@@ -181,6 +181,11 @@ public class HRegion implements HeapSize
   private ClassToInstanceMap<CoprocessorProtocol>
       protocolHandlers = MutableClassToInstanceMap.create();
 
+  /**
+   * Temporary subdirectory of the region directory used for compaction output.
+   */
+  public static final String REGION_TEMP_SUBDIR = ".tmp";
+
   //These variable are just used for getting data out of the region, to test on
   //client side
   // private int numStores = 0;
@@ -325,6 +330,30 @@ public class HRegion implements HeapSize
     oldVal.addAndGet(amount);
   }
 
+  public static long getNumericMetric(String key) {
+    AtomicLong m = numericMetrics.get(key);
+    if (m == null)
+      return 0;
+    return m.get();
+  }
+
+  public static Pair<Long, Integer> getTimeVaryingMetric(String key) {
+    Pair<AtomicLong, AtomicInteger> pair = timeVaryingMetrics.get(key);
+    if (pair == null) {
+      return new Pair<Long, Integer>(0L, 0);
+    }
+
+    return new Pair<Long, Integer>(pair.getFirst().get(),
+        pair.getSecond().get());
+  }
+
+  static long getNumericPersistentMetric(String key) {
+    AtomicLong m = numericPersistentMetrics.get(key);
+    if (m == null)
+      return 0;
+    return m.get();
+  }
+
   /**
    * Should only be used for testing purposes
    */
@@ -935,11 +964,11 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * Get the temporary diretory for this region. This directory
+   * Get the temporary directory for this region. This directory
    * will have its contents removed when the region is reopened.
    */
   Path getTmpDir() {
-    return new Path(getRegionDir(), ".tmp");
+    return new Path(getRegionDir(), REGION_TEMP_SUBDIR);
   }
 
   void triggerMajorCompaction() {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 15 01:26:05 2011
@@ -125,7 +125,9 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.security.User;
@@ -243,6 +245,8 @@ public class HRegionServer implements HR
   private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
 
   private RegionServerMetrics metrics;
+
+  @SuppressWarnings("unused")
   private RegionServerDynamicMetrics dynamicMetrics;
 
   // Compactions
@@ -289,6 +293,7 @@ public class HRegionServer implements HR
 
   // Instance of the hbase executor service.
   private ExecutorService service;
+  @SuppressWarnings("unused")
 
   // Replication services. If no replication, this handler will be null.
   private ReplicationSourceService replicationSourceHandler;
@@ -1267,24 +1272,6 @@ public class HRegionServer implements HR
     }
   }
 
-  /**
-   * Help function for metrics() that increments a map value if it exists.
-   *
-   * @param map
-   *          The map to work with
-   * @param key
-   *          the string key
-   * @param val
-   *          the value to add or set the map key to
-   */
-  protected void incrMap(Map<String, MutableDouble> map, String key, double val) {
-    if (map.get(key) != null) {
-      map.get(key).add(val);
-    } else {
-      map.put(key, new MutableDouble(val));
-    }
-  }
-
   protected void metrics() {
     this.metrics.regions.set(this.onlineRegions.size());
     this.metrics.incrementRequests(this.requestCount.get());
@@ -1303,19 +1290,13 @@ public class HRegionServer implements HR
     long totalStaticIndexSize = 0;
     long totalStaticBloomSize = 0;
 
-    long tmpfiles;
-    long tmpindex;
-    long tmpfilesize;
-    long tmpbloomsize;
-    long tmpstaticsize;
-    String cfname;
-
     // Note that this is a map of Doubles instead of Longs. This is because we
     // do effective integer division, which would perhaps truncate more than it
     // should because we do it only on one part of our sum at a time. Rather
     // than dividing at the end, where it is difficult to know the proper
     // factor, everything is exact then truncated.
-    Map<String, MutableDouble> tempVals = new HashMap<String, MutableDouble>();
+    final Map<String, MutableDouble> tempVals =
+        new HashMap<String, MutableDouble>();
 
     for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
       HRegion r = e.getValue();
@@ -1325,39 +1306,61 @@ public class HRegionServer implements HR
       synchronized (r.stores) {
         stores += r.stores.size();
         for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
-          Store store = ee.getValue();
-          tmpfiles = store.getStorefilesCount();
-          tmpindex = store.getStorefilesIndexSize();
-          tmpfilesize = store.getStorefilesSize();
-          tmpbloomsize = store.getTotalStaticBloomSize();
-          tmpstaticsize = store.getTotalStaticIndexSize();
-
-          // Note that there is only one store per CF so setting is safe
-          cfname = "cf." + store.toString();
-          this.incrMap(tempVals, cfname + ".storeFileCount", tmpfiles);
-          this.incrMap(tempVals, cfname + ".storeFileIndexSizeMB",
-              (tmpindex / (1024.0 * 1024)));
-          this.incrMap(tempVals, cfname + ".storeFileSizeMB",
-              (tmpfilesize / (1024.0 * 1024)));
-          this.incrMap(tempVals, cfname + ".staticBloomSizeKB",
-              (tmpbloomsize / 1024.0));
-          this.incrMap(tempVals, cfname + ".memstoreSizeMB",
-              (store.getMemStoreSize() / (1024.0 * 1024)));
-          this.incrMap(tempVals, cfname + ".staticIndexSizeKB",
-              tmpstaticsize / 1024.0);
-
-          storefiles += tmpfiles;
-          storefileIndexSize += tmpindex;
-          totalStaticIndexSize += tmpstaticsize;
-          totalStaticBloomSize += tmpbloomsize;
+            final Store store = ee.getValue();
+            final SchemaMetrics schemaMetrics = store.getSchemaMetrics();
+
+            {
+              long tmpStorefiles = store.getStorefilesCount();
+              schemaMetrics.accumulateStoreMetric(tempVals,
+                  StoreMetricType.STORE_FILE_COUNT, tmpStorefiles);
+              storefiles += tmpStorefiles;
+            }
+
+
+            {
+              long tmpStorefileIndexSize = store.getStorefilesIndexSize();
+              schemaMetrics.accumulateStoreMetric(tempVals,
+                  StoreMetricType.STORE_FILE_INDEX_SIZE,
+                  (long) (tmpStorefileIndexSize / (1024.0 * 1024)));
+              storefileIndexSize += tmpStorefileIndexSize;
+            }
+
+            {
+              long tmpStorefilesSize = store.getStorefilesSize();
+              schemaMetrics.accumulateStoreMetric(tempVals,
+                  StoreMetricType.STORE_FILE_SIZE_MB,
+                  (long) (tmpStorefilesSize / (1024.0 * 1024)));
+            }
+
+            {
+              long tmpStaticBloomSize = store.getTotalStaticBloomSize();
+              schemaMetrics.accumulateStoreMetric(tempVals,
+                  StoreMetricType.STATIC_BLOOM_SIZE_KB,
+                  (long) (tmpStaticBloomSize / 1024.0));
+              totalStaticBloomSize += tmpStaticBloomSize;
+            }
+
+            {
+              long tmpStaticIndexSize = store.getTotalStaticIndexSize();
+              schemaMetrics.accumulateStoreMetric(tempVals,
+                  StoreMetricType.STATIC_INDEX_SIZE_KB,
+                  (long) (tmpStaticIndexSize / 1024.0));
+              totalStaticIndexSize += tmpStaticIndexSize;
+            }
+
+            schemaMetrics.accumulateStoreMetric(tempVals,
+                StoreMetricType.MEMSTORE_SIZE_MB,
+                (long) (store.getMemStoreSize() / (1024.0 * 1024)));
         }
       }
 
       hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
     }
+
     for (Entry<String, MutableDouble> e : tempVals.entrySet()) {
       HRegion.setNumericMetric(e.getKey(), e.getValue().longValue());
     }
+
     this.metrics.stores.set(stores);
     this.metrics.storefiles.set(storefiles);
     this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Nov 15 01:26:05 2011
@@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.monitorin
 import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
@@ -90,7 +92,7 @@ import com.google.common.collect.Lists;
  * <p>Locking and transactions are handled at a higher level.  This API should
  * not be called directly but by an HRegion manager.
  */
-public class Store implements HeapSize {
+public class Store extends SchemaConfigured implements HeapSize {
   static final Log LOG = LogFactory.getLog(Store.class);
   protected final MemStore memstore;
   // This stores directory in the filesystem.
@@ -160,6 +162,8 @@ public class Store implements HeapSize {
   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
     FileSystem fs, Configuration conf)
   throws IOException {
+    super(conf, region.getTableDesc().getNameAsString(),
+        Bytes.toString(family.getName()));
     HRegionInfo info = region.regionInfo;
     this.fs = fs;
     this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
@@ -193,7 +197,7 @@ public class Store implements HeapSize {
         family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
         this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
-    this.storeNameStr = Bytes.toString(this.family.getName());
+    this.storeNameStr = getColumnFamilyName();
 
     // By default, compact if storefile.count >= minFilesToCompact
     this.minFilesToCompact = Math.max(2,
@@ -503,7 +507,6 @@ public class Store implements HeapSize {
       MonitoredTask status)
       throws IOException {
     StoreFile.Writer writer;
-    String fileName;
     long flushed = 0;
     Path pathName;
     // Don't flush if there are no entries.
@@ -598,8 +601,8 @@ public class Store implements HeapSize {
     // retrieved from HRegion.recentFlushes, which is set within
     // HRegion.internalFlushcache, which indirectly calls this to actually do
     // the flushing through the StoreFlusherImpl class
-    HRegion.incrNumericPersistentMetric("cf." + this.toString() + ".flushSize",
-        flushedSize.longValue());
+    getSchemaMetrics().updatePersistentStoreMetric(
+        SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushedSize.longValue());
     if (LOG.isInfoEnabled()) {
       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
         ", sequenceid=" + logCacheFlushId +
@@ -625,9 +628,17 @@ public class Store implements HeapSize {
   private StoreFile.Writer createWriterInTmp(int maxKeyCount,
     Compression.Algorithm compression)
   throws IOException {
-    return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
-        compression, this.comparator, this.conf, this.cacheConf,
-        this.family.getBloomFilterType(), maxKeyCount);
+    StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
+        blocksize, compression, comparator, conf, cacheConf,
+        family.getBloomFilterType(), maxKeyCount);
+    if (w.writer instanceof SchemaConfigured) {
+      // The store file writer's path does not include the CF name, so we need
+      // to configure the HFile writer directly.
+      SchemaConfigured sc = (SchemaConfigured) w.writer;
+      SchemaConfigured.resetSchemaMetricsConf(sc);
+      passSchemaMetricsTo(sc);
+    }
+    return w;
   }
 
   /*
@@ -1927,10 +1938,11 @@ public class Store implements HeapSize {
     return this.cacheConf;
   }
 
-  public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
-      + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
-      + (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT)
-      + Bytes.SIZEOF_BOOLEAN);
+  public static final long FIXED_OVERHEAD =
+      ClassSize.align(new SchemaConfigured().heapSize()
+          + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+          + (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT)
+          + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1201994&r1=1201993&r2=1201994&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Nov 15 01:26:05 2011
@@ -48,12 +48,13 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -239,6 +240,8 @@ public class StoreFile {
     } else {
       this.modificationTimeStamp = 0;
     }
+
+    SchemaMetrics.configureGlobally(conf);
   }
 
   /**
@@ -1060,7 +1063,7 @@ public class StoreFile {
   /**
    * Reader for a StoreFile.
    */
-  public static class Reader {
+  public static class Reader extends SchemaConfigured {
     static final Log LOG = LogFactory.getLog(Reader.class.getName());
 
     protected BloomFilter generalBloomFilter = null;
@@ -1069,21 +1072,13 @@ public class StoreFile {
     private final HFile.Reader reader;
     protected TimeRangeTracker timeRangeTracker = null;
     protected long sequenceID = -1;
-    private final String bloomAccessedMetric;
-    private final String bloomSkippedMetric;
     private byte[] lastBloomKey;
     private long deleteFamilyCnt = -1;
 
     public Reader(FileSystem fs, Path path, CacheConfig cacheConf)
         throws IOException {
+      super(path);
       reader = HFile.createReader(fs, path, cacheConf);
-
-      // prepare the text (key) for the metrics
-      bloomAccessedMetric = reader.getColumnFamilyName() +
-          ".keyMaybeInBloomCnt";
-      bloomSkippedMetric = reader.getColumnFamilyName() +
-          ".keyNotInBloomCnt";
-
       bloomFilterType = BloomType.NONE;
     }
 
@@ -1092,8 +1087,6 @@ public class StoreFile {
      */
     Reader() {
       this.reader = null;
-      bloomAccessedMetric = "";
-      bloomSkippedMetric = "";
     }
 
     public RawComparator<byte []> getComparator() {
@@ -1353,10 +1346,7 @@ public class StoreFile {
                 && bloomFilter.contains(key, 0, key.length, bloom);
           }
 
-          if (exists)
-            HRegion.incrNumericMetric(bloomAccessedMetric, 1);
-          else
-            HRegion.incrNumericMetric(bloomSkippedMetric, 1);
+          getSchemaMetrics().updateBloomMetrics(exists);
           return exists;
         }
       } catch (IOException e) {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1201994&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java Tue Nov 15 01:26:05 2011
@@ -0,0 +1,237 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+/**
+ * A base class for objects that are associated with a particular table and
+ * column family. Provides a way to obtain the schema metrics object.
+ * <p>
+ * Due to the variety of things that can be associated with a table/CF, there
+ * are many ways to initialize this base class, either in the constructor, or
+ * from another similar object. For example, an HFile reader configures HFile
+ * blocks it reads with its own table/CF name.
+ */
+public class SchemaConfigured implements HeapSize, SchemaAware {
+  private static final Log LOG = LogFactory.getLog(SchemaConfigured.class);
+
+  // These are not final because we set them at runtime e.g. for HFile blocks.
+  private String cfName;
+  private String tableName;
+
+  /**
+   * Schema metrics. Can only be initialized when we know our column family
+   * name, table name, and have had a chance to take a look at the
+   * configuration (in {@link SchemaMetrics#configureGlobally(Configuration))
+   * so we know whether we are using per-table metrics. Therefore, initialized
+   * lazily. We don't make this volatile because even if a thread sees a stale
+   * value of null, it will be re-initialized to the same value that other
+   * threads see.
+   */
+  private SchemaMetrics schemaMetrics;
+
+  /** A helper constructor that configures the "use table name" flag. */
+  private SchemaConfigured(Configuration conf) {
+    if (conf != null) {
+      SchemaMetrics.configureGlobally(conf);
+      // Even though we now know if table-level metrics are used, we can't
+      // initialize schemaMetrics yet, because CF and table name are only known
+      // to the calling constructor.
+    }
+  }
+
+  /**
+   * Default constructor. Only use when column/family name are not known at
+   * construction (i.e. for HFile blocks).
+   */
+  public SchemaConfigured() {
+  }
+
+  /**
+   * Initialize table and column family name from an HFile path. If
+   * configuration is null,
+   * {@link SchemaMetrics#configureGlobally(Configuration)} should have been
+   * called already.
+   */
+  public SchemaConfigured(Configuration conf, Path path) {
+    this(conf);
+
+    if (path != null) {
+      String splits[] = path.toString().split("/");
+      int numPathLevels = splits.length;
+      if (numPathLevels > 0 && splits[0].isEmpty()) {
+        // This path starts with an '/'.
+        --numPathLevels;
+      }
+      if (numPathLevels < HFile.MIN_NUM_HFILE_PATH_LEVELS) {
+        LOG.warn("Could not determine table and column family of the HFile "
+            + "path " + path + ". Expecting at least "
+            + HFile.MIN_NUM_HFILE_PATH_LEVELS + " path components.");
+        path = null;
+      } else {
+        cfName = splits[splits.length - 2];
+        if (cfName.equals(HRegion.REGION_TEMP_SUBDIR)) {
+          // This is probably a compaction output file. We will set the real CF
+          // name later.
+          cfName = null;
+        } else {
+          cfName = cfName.intern();
+        }
+        tableName = splits[splits.length - 4].intern();
+        return;
+      }
+    }
+
+    // This might also happen if we were passed an incorrect path.
+    cfName = SchemaMetrics.UNKNOWN;
+    tableName = SchemaMetrics.UNKNOWN;
+  }
+
+  /**
+   * Used when we know an HFile path to deduce table and CF name from, but do
+   * not have a configuration.
+   * @param path an HFile path
+   */
+  public SchemaConfigured(Path path) {
+    this(null, path);
+  }
+
+  /**
+   * Used when we know table and column family name. If configuration is null,
+   * {@link SchemaMetrics#configureGlobally(Configuration)} should have been
+   * called already.
+   */
+  public SchemaConfigured(Configuration conf, String tableName, String cfName)
+  {
+    this(conf);
+    this.tableName = tableName.intern();
+    this.cfName = cfName.intern();
+  }
+
+  public SchemaConfigured(SchemaAware that) {
+    tableName = that.getTableName().intern();
+    cfName = that.getColumnFamilyName().intern();
+    schemaMetrics = that.getSchemaMetrics();
+  }
+
+  @Override
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public String getColumnFamilyName() {
+    return cfName;
+  }
+
+  @Override
+  public SchemaMetrics getSchemaMetrics() {
+    if (schemaMetrics == null) {
+      if (tableName == null || cfName == null) {
+        throw new IllegalStateException("Schema metrics requested before " +
+            "table/CF name initialization: " + schemaConfAsJSON());
+      }
+      schemaMetrics = SchemaMetrics.getInstance(tableName, cfName);
+    }
+    return schemaMetrics;
+  }
+
+  /**
+   * Configures the given object (e.g. an HFile block) with the current table
+   * and column family name, and the associated collection of metrics. Please
+   * note that this method configures the <b>other</b> object, not <b>this</b>
+   * object.
+   */
+  public void passSchemaMetricsTo(SchemaConfigured target) {
+    if (!isSchemaConfigured()) {
+      // Cannot configure another object if we are not configured ourselves.
+      throw new IllegalStateException("Table name/CF not initialized");
+    }
+
+    if (conflictingWith(target)) {
+      // Make sure we don't try to change table or CF name.
+      throw new IllegalArgumentException("Trying to change table name to \"" +
+          tableName + "\", CF name to \"" + cfName + "\" from " +
+          target.schemaConfAsJSON());
+    }
+    target.tableName = tableName.intern();
+    target.cfName = cfName.intern();
+    target.schemaMetrics = schemaMetrics;
+    target.schemaConfigurationChanged();
+  }
+
+  /**
+   * Reset schema metrics configuration in this particular instance. Used when
+   * legitimately need to re-initialize the object with another table/CF.
+   * This is a static method because its use is discouraged and reserved for
+   * when it is really necessary (e.g. writing HFiles in a temp direcdtory
+   * on compaction).
+   */
+  public static void resetSchemaMetricsConf(SchemaConfigured target) {
+    target.tableName = null;
+    target.cfName = null;
+    target.schemaMetrics = null;
+  }
+
+  @Override
+  public long heapSize() {
+    // We don't count table name and column family name characters because
+    // these strings are shared among many objects.
+    return ClassSize.align(ClassSize.OBJECT + 3 * ClassSize.REFERENCE);
+  }
+
+  public String schemaConfAsJSON() {
+    return "{\"tableName\":\"" + tableName + "\",\"cfName\":\"" + cfName
+        + "\"}";
+  }
+
+  protected boolean isSchemaConfigured() {
+    return tableName != null && cfName != null;
+  }
+
+  /**
+   * Determines if the current object's table/CF settings are not in conflict
+   * with the other object's table and CF. If the other object's table/CF are
+   * undefined, they are not considered to be in conflict. Used to sanity-check
+   * configuring the other object with this object's table/CF.
+   */
+  boolean conflictingWith(SchemaConfigured other) {
+    return (other.tableName != null && !tableName.equals(other.tableName)) ||
+        (other.cfName != null && !cfName.equals(other.cfName));
+  }
+
+  /**
+   * A hook method called when schema configuration changes. Can be used to
+   * update schema-aware member fields.
+   */
+  protected void schemaConfigurationChanged() {
+  }
+
+}



Mime
View raw message