hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1445918 [8/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/mai...
Date Wed, 13 Feb 2013 20:58:32 GMT
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java Wed Feb 13 20:58:23 2013
@@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtoc
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
 
 /**
  * A {@link Filter} that checks a single column value, but does not emit the
@@ -84,28 +86,31 @@ public class SingleColumnValueExcludeFil
    * @param qualifier
    * @param compareOp
    * @param comparator
-   * @param foundColumn
-   * @param matchedColumn
    * @param filterIfMissing
    * @param latestVersionOnly
    */
-  protected SingleColumnValueExcludeFilter(final byte[] family, final byte [] qualifier,
-    final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
-    final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
-    super(family,qualifier,compareOp,comparator,foundColumn,
-      matchedColumn,filterIfMissing,latestVersionOnly);
-  }
-
-  public ReturnCode filterKeyValue(KeyValue keyValue) {
-    ReturnCode superRetCode = super.filterKeyValue(keyValue);
-    if (superRetCode == ReturnCode.INCLUDE) {
+  protected SingleColumnValueExcludeFilter(final byte[] family, final byte[] qualifier,
+      final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+      final boolean latestVersionOnly) {
+    super(family, qualifier, compareOp, comparator, filterIfMissing, latestVersionOnly);
+  }
+
+  // We cleaned result row in FilterRow to be consistent with scanning process.
+  public boolean hasFilterRow() {
+   return true;
+  }
+
+  // Here we remove from row all key values from testing column
+  public void filterRow(List<KeyValue> kvs) {
+    Iterator it = kvs.iterator();
+    while (it.hasNext()) {
+      KeyValue kv = (KeyValue)it.next();
       // If the current column is actually the tested column,
       // we will skip it instead.
-      if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
-        return ReturnCode.SKIP;
+      if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+        it.remove();
       }
     }
-    return superRetCode;
   }
 
   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
@@ -157,11 +162,10 @@ public class SingleColumnValueExcludeFil
       throw new DeserializationException(ioe);
     }
 
-    return new SingleColumnValueExcludeFilter(
-      parentProto.hasColumnFamily()?parentProto.getColumnFamily().toByteArray():null,
-      parentProto.hasColumnQualifier()?parentProto.getColumnQualifier().toByteArray():null,
-      compareOp, comparator, parentProto.getFoundColumn(),parentProto.getMatchedColumn(),
-      parentProto.getFilterIfMissing(),parentProto.getLatestVersionOnly());
+    return new SingleColumnValueExcludeFilter(parentProto.hasColumnFamily() ? parentProto
+        .getColumnFamily().toByteArray() : null, parentProto.hasColumnQualifier() ? parentProto
+        .getColumnQualifier().toByteArray() : null, compareOp, comparator, parentProto
+        .getFilterIfMissing(), parentProto.getLatestVersionOnly());
   }
 
   /**

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Wed Feb 13 20:58:23 2013
@@ -128,17 +128,13 @@ public class SingleColumnValueFilter ext
    * @param qualifier
    * @param compareOp
    * @param comparator
-   * @param foundColumn
-   * @param matchedColumn
    * @param filterIfMissing
    * @param latestVersionOnly
    */
-  protected SingleColumnValueFilter(final byte[] family, final byte [] qualifier,
-    final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
-    final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
-    this(family,qualifier,compareOp,comparator);
-    this.foundColumn = foundColumn;
-    this.matchedColumn = matchedColumn;
+  protected SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
+      final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+      final boolean latestVersionOnly) {
+    this(family, qualifier, compareOp, comparator);
     this.filterIfMissing = filterIfMissing;
     this.latestVersionOnly = latestVersionOnly;
   }
@@ -313,8 +309,6 @@ public class SingleColumnValueFilter ext
     HBaseProtos.CompareType compareOp = CompareType.valueOf(this.compareOp.name());
     builder.setCompareOp(compareOp);
     builder.setComparator(ProtobufUtil.toComparator(this.comparator));
-    builder.setFoundColumn(this.foundColumn);
-    builder.setMatchedColumn(this.matchedColumn);
     builder.setFilterIfMissing(this.filterIfMissing);
     builder.setLatestVersionOnly(this.latestVersionOnly);
 
@@ -352,11 +346,10 @@ public class SingleColumnValueFilter ext
       throw new DeserializationException(ioe);
     }
 
-    return new SingleColumnValueFilter(
-      proto.hasColumnFamily()?proto.getColumnFamily().toByteArray():null,
-      proto.hasColumnQualifier()?proto.getColumnQualifier().toByteArray():null,
-      compareOp, comparator, proto.getFoundColumn(),proto.getMatchedColumn(),
-      proto.getFilterIfMissing(),proto.getLatestVersionOnly());
+    return new SingleColumnValueFilter(proto.hasColumnFamily() ? proto.getColumnFamily()
+        .toByteArray() : null, proto.hasColumnQualifier() ? proto.getColumnQualifier()
+        .toByteArray() : null, compareOp, comparator, proto.getFilterIfMissing(), proto
+        .getLatestVersionOnly());
   }
 
   /**
@@ -373,12 +366,19 @@ public class SingleColumnValueFilter ext
       && Bytes.equals(this.getQualifier(), other.getQualifier())
       && this.compareOp.equals(other.compareOp)
       && this.getComparator().areSerializedFieldsEqual(other.getComparator())
-      && this.foundColumn == other.foundColumn
-      && this.matchedColumn == other.matchedColumn
       && this.getFilterIfMissing() == other.getFilterIfMissing()
       && this.getLatestVersionOnly() == other.getLatestVersionOnly();
   }
 
+  /**
+   * The only CF this filter needs is given column family. So, it's the only essential
+   * column in whole scan. If filterIfMissing == false, all families are essential,
+   * because of possibility of skipping the rows without any data in filtered CF.
+   */
+  public boolean isFamilyEssential(byte[] name) {
+    return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+  }
+
   @Override
   public String toString() {
     return String.format("%s (%s, %s, %s, %s)",

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Wed Feb 13 20:58:23 2013
@@ -138,6 +138,10 @@ public class SkipFilter extends FilterBa
     return getFilter().areSerializedFieldsEqual(other.getFilter());
   }
 
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + " " + this.filter.toString();

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Wed Feb 13 20:58:23 2013
@@ -138,6 +138,10 @@ public class WhileMatchFilter extends Fi
     return getFilter().areSerializedFieldsEqual(other.getFilter());
   }
 
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + " " + this.filter.toString();

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java Wed Feb 13 20:58:23 2013
@@ -33,13 +33,12 @@ import java.net.URI;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -47,8 +46,9 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * An encapsulation for the FileSystem object that hbase uses to access
@@ -259,7 +259,7 @@ public class HFileSystem extends FilterF
       final ReorderBlocks lrb, final Configuration conf) {
     return (ClientProtocol) Proxy.newProxyInstance
         (cp.getClass().getClassLoader(),
-            new Class[]{ClientProtocol.class},
+            new Class[]{ClientProtocol.class, Closeable.class},
             new InvocationHandler() {
               public Object invoke(Object proxy, Method method,
                                    Object[] args) throws Throwable {

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java Wed Feb 13 20:58:23 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io;
 import java.io.BufferedInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
 
 import com.google.protobuf.ByteString;
 
@@ -56,7 +54,7 @@ import com.google.protobuf.ByteString;
  * references.  References are cleaned up by compactions.
  */
 @InterfaceAudience.Private
-public class Reference implements Writable {
+public class Reference {
   private byte [] splitkey;
   private Range region;
 
@@ -99,7 +97,6 @@ public class Reference implements Writab
 
   /**
    * Used by serializations.
-   * @deprecated Use the pb serializations instead.  Writables are going away.
    */
   @Deprecated
   // Make this private when it comes time to let go of this constructor.  Needed by pb serialization.
@@ -130,18 +127,14 @@ public class Reference implements Writab
     return "" + this.region;
   }
 
-  /**
-   * @deprecated Writables are going away. Use the pb serialization methods instead.
-   */
-  @Deprecated
-  public void write(DataOutput out) throws IOException {
-    // Write true if we're doing top of the file.
-    out.writeBoolean(isTopFileRegion(this.region));
-    Bytes.writeByteArray(out, this.splitkey);
+  public static boolean isTopFileRegion(final Range r) {
+    return r.equals(Range.top);
   }
 
   /**
    * @deprecated Writables are going away. Use the pb serialization methods instead.
+   * Remove in a release after 0.96 goes out.  This is here only to migrate
+   * old Reference files written with Writables before 0.96.
    */
   @Deprecated
   public void readFields(DataInput in) throws IOException {
@@ -151,10 +144,6 @@ public class Reference implements Writab
     this.splitkey = Bytes.readByteArray(in);
   }
 
-  public static boolean isTopFileRegion(final Range r) {
-    return r.equals(Range.top);
-  }
-
   public Path write(final FileSystem fs, final Path p)
   throws IOException {
     FSDataOutputStream out = fs.create(p, false);

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Wed Feb 13 20:58:23 2013
@@ -19,12 +19,8 @@
 
 package org.apache.hadoop.hbase.io;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -39,7 +35,7 @@ import org.apache.hadoop.hbase.util.Byte
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class TimeRange implements Writable {
+public class TimeRange {
   private long minStamp = 0L;
   private long maxStamp = Long.MAX_VALUE;
   private boolean allTime = false;
@@ -184,17 +180,4 @@ public class TimeRange implements Writab
     sb.append(this.minStamp);
     return sb.toString();
   }
-
-  //Writable
-  public void readFields(final DataInput in) throws IOException {
-    this.minStamp = in.readLong();
-    this.maxStamp = in.readLong();
-    this.allTime = in.readBoolean();
-  }
-
-  public void write(final DataOutput out) throws IOException {
-    out.writeLong(minStamp);
-    out.writeLong(maxStamp);
-    out.writeBoolean(this.allTime);
-  }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Wed Feb 13 20:58:23 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Clas
  * Cache Key for use with implementations of {@link BlockCache}
  */
 @InterfaceAudience.Private
-public class BlockCacheKey implements HeapSize {
+public class BlockCacheKey implements HeapSize, java.io.Serializable {
   private final String hfileName;
   private final long offset;
   private final DataBlockEncoding encoding;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Wed Feb 13 20:58:23 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.DirectMemoryUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -72,6 +74,28 @@ public class CacheConfig {
   public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
       "hbase.rs.evictblocksonclose";
 
+  /**
+   * Configuration keys for Bucket cache
+   */
+  public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
+  public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
+  public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = 
+      "hbase.bucketcache.persistent.path";
+  public static final String BUCKET_CACHE_COMBINED_KEY = 
+      "hbase.bucketcache.combinedcache.enabled";
+  public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = 
+      "hbase.bucketcache.percentage.in.combinedcache";
+  public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
+  public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = 
+      "hbase.bucketcache.writer.queuelength";
+  /**
+   * Defaults for Bucket cache
+   */
+  public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
+  public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
+  public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
+  public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
+
   // Defaults
 
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -341,19 +365,60 @@ public class CacheConfig {
 
     // Calculate the amount of heap to give the heap.
     MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-    long cacheSize = (long)(mu.getMax() * cachePercentage);
+    long lruCacheSize = (long) (mu.getMax() * cachePercentage);
     int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
         HFile.DEFAULT_BLOCKSIZE);
     long offHeapCacheSize =
       (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
           DirectMemoryUtils.getDirectMemorySize());
-    LOG.info("Allocating LruBlockCache with maximum size " +
-      StringUtils.humanReadableInt(cacheSize));
     if (offHeapCacheSize <= 0) {
-      globalBlockCache = new LruBlockCache(cacheSize,
-          StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+      String bucketCacheIOEngineName = conf
+          .get(BUCKET_CACHE_IOENGINE_KEY, null);
+      float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
+      // A percentage of max heap size or a absolute value with unit megabytes
+      long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
+          * bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
+
+      boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
+          DEFAULT_BUCKET_CACHE_COMBINED);
+      BucketCache bucketCache = null;
+      if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
+        int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
+            DEFAULT_BUCKET_CACHE_WRITER_THREADS);
+        int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
+            DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
+        String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
+        float combinedPercentage = conf.getFloat(
+            BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
+            DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
+        if (combinedWithLru) {
+          lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
+          bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
+        }
+        try {
+          int ioErrorsTolerationDuration = conf.getInt(
+              "hbase.bucketcache.ioengine.errors.tolerated.duration",
+              BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+          bucketCache = new BucketCache(bucketCacheIOEngineName,
+              bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
+              ioErrorsTolerationDuration);
+        } catch (IOException ioex) {
+          LOG.error("Can't instantiate bucket cache", ioex);
+          throw new RuntimeException(ioex);
+        }
+      }
+      LOG.info("Allocating LruBlockCache with maximum size "
+          + StringUtils.humanReadableInt(lruCacheSize));
+      LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
+          StoreFile.DEFAULT_BLOCKSIZE_SMALL);
+      lruCache.setVictimCache(bucketCache);
+      if (bucketCache != null && combinedWithLru) {
+        globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+      } else {
+        globalBlockCache = lruCache;
+      }
     } else {
-      globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
+      globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
           StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
     }
     return globalBlockCache;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Wed Feb 13 20:58:23 2013
@@ -171,6 +171,22 @@ public class CacheStats {
     windowIndex = (windowIndex + 1) % numPeriodsInWindow;
   }
 
+  public long getSumHitCountsPastNPeriods() {
+    return sum(hitCounts);
+  }
+
+  public long getSumRequestCountsPastNPeriods() {
+    return sum(requestCounts);
+  }
+
+  public long getSumHitCachingCountsPastNPeriods() {
+    return sum(hitCachingCounts);
+  }
+
+  public long getSumRequestCachingCountsPastNPeriods() {
+    return sum(requestCachingCounts);
+  }
+
   public double getHitRatioPastNPeriods() {
     double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
     return Double.isNaN(ratio) ? 0 : ratio;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Wed Feb 13 20:58:23 2013
@@ -56,4 +56,9 @@ public interface Cacheable extends HeapS
    */
   public CacheableDeserializer<Cacheable> getDeserializer();
 
+  /**
+   * @return the block type of this cached HFile block
+   */
+  public BlockType getBlockType();
+
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Wed Feb 13 20:58:23 2013
@@ -34,4 +34,21 @@ public interface CacheableDeserializer<T
    * @return T the deserialized object.
    */
   public T deserialize(ByteBuffer b) throws IOException;
+
+  /**
+   * 
+   * @param b
+   * @param reuse true if Cacheable object can use the given buffer as its
+   *          content
+   * @return T the deserialized object.
+   * @throws IOException
+   */
+  public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+
+  /**
+   * Get the identifier of this deserialiser. Identifier is unique for each
+   * deserializer and generated by {@link CacheableDeserializerIdManager}
+   * @return identifier number of this cacheable deserializer
+   */
+  public int getDeserialiserIdentifier();
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java Wed Feb 13 20:58:23 2013
@@ -96,11 +96,24 @@ public class CachedBlock implements Heap
     return size;
   }
 
+  @Override
   public int compareTo(CachedBlock that) {
     if(this.accessTime == that.accessTime) return 0;
     return this.accessTime < that.accessTime ? 1 : -1;
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    CachedBlock other = (CachedBlock) obj;
+    return compareTo(other) == 0;
+  }
+
   public Cacheable getBuffer() {
     return this.buf;
   }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java Wed Feb 13 20:58:23 2013
@@ -54,7 +54,7 @@ public class ChecksumUtil {
    *                    compute checkums from
    * @param endOffset ending offset in the indata stream upto
    *                   which checksums needs to be computed
-   * @param outData the output buffer where checksum values are written
+   * @param outdata the output buffer where checksum values are written
    * @param outOffset the starting offset in the outdata where the
    *                  checksum values are written
    * @param checksumType type of checksum

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Wed Feb 13 20:58:23 2013
@@ -18,13 +18,10 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,7 +30,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.RawComparator;
 
@@ -57,6 +56,9 @@ public class FixedFileTrailer {
 
   private static final Log LOG = LogFactory.getLog(FixedFileTrailer.class);
 
+  /** HFile minor version that introduced pbuf filetrailer */
+  private static final int PBUF_TRAILER_MINOR_VERSION = 2;
+
   /**
    * We store the comparator class name as a fixed-length field in the trailer.
    */
@@ -113,7 +115,7 @@ public class FixedFileTrailer {
   private long lastDataBlockOffset;
 
   /** Raw key comparator class name in version 2 */
-  private String comparatorClassName = RawComparator.class.getName();
+  private String comparatorClassName = KeyValue.KEY_COMPARATOR.getClass().getName();
 
   /** The {@link HFile} format major version. */
   private final int majorVersion;
@@ -129,11 +131,10 @@ public class FixedFileTrailer {
 
   private static int[] computeTrailerSizeByVersion() {
     int versionToSize[] = new int[HFile.MAX_FORMAT_VERSION + 1];
-    for (int version = MIN_FORMAT_VERSION;
-         version <= MAX_FORMAT_VERSION;
+    for (int version = HFile.MIN_FORMAT_VERSION;
+         version <= HFile.MAX_FORMAT_VERSION;
          ++version) {
-      FixedFileTrailer fft = new FixedFileTrailer(version, 
-                                   HFileBlock.MINOR_VERSION_NO_CHECKSUM);
+      FixedFileTrailer fft = new FixedFileTrailer(version, HFileBlock.MINOR_VERSION_NO_CHECKSUM);
       DataOutputStream dos = new DataOutputStream(new NullOutputStream());
       try {
         fft.serialize(dos);
@@ -148,8 +149,8 @@ public class FixedFileTrailer {
 
   private static int getMaxTrailerSize() {
     int maxSize = 0;
-    for (int version = MIN_FORMAT_VERSION;
-         version <= MAX_FORMAT_VERSION;
+    for (int version = HFile.MIN_FORMAT_VERSION;
+         version <= HFile.MAX_FORMAT_VERSION;
          ++version)
       maxSize = Math.max(getTrailerSize(version), maxSize);
     return maxSize;
@@ -158,6 +159,8 @@ public class FixedFileTrailer {
   private static final int TRAILER_SIZE[] = computeTrailerSizeByVersion();
   private static final int MAX_TRAILER_SIZE = getMaxTrailerSize();
 
+  private static final int NOT_PB_SIZE = BlockType.MAGIC_LENGTH + Bytes.SIZEOF_INT;
+
   static int getTrailerSize(int version) {
     return TRAILER_SIZE[version];
   }
@@ -178,42 +181,89 @@ public class FixedFileTrailer {
     HFile.checkFormatVersion(majorVersion);
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutput baosDos = new DataOutputStream(baos);
+    DataOutputStream baosDos = new DataOutputStream(baos);
 
     BlockType.TRAILER.write(baosDos);
-    baosDos.writeLong(fileInfoOffset);
-    baosDos.writeLong(loadOnOpenDataOffset);
-    baosDos.writeInt(dataIndexCount);
+    if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+      serializeAsPB(baosDos);
+    } else {
+      serializeAsWritable(baosDos);
+    }
+
+    // The last 4 bytes of the file encode the major and minor version universally
+    baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
+
+    outputStream.write(baos.toByteArray());
+  }
+
+  /**
+   * Write trailer data as protobuf
+   * @param outputStream
+   * @throws IOException
+   */
+  void serializeAsPB(DataOutputStream output) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    HFileProtos.FileTrailerProto.newBuilder()
+      .setFileInfoOffset(fileInfoOffset)
+      .setLoadOnOpenDataOffset(loadOnOpenDataOffset)
+      .setUncompressedDataIndexSize(uncompressedDataIndexSize)
+      .setTotalUncompressedBytes(totalUncompressedBytes)
+      .setDataIndexCount(dataIndexCount)
+      .setMetaIndexCount(metaIndexCount)
+      .setEntryCount(entryCount)
+      .setNumDataIndexLevels(numDataIndexLevels)
+      .setFirstDataBlockOffset(firstDataBlockOffset)
+      .setLastDataBlockOffset(lastDataBlockOffset)
+      .setComparatorClassName(comparatorClassName)
+      .setCompressionCodec(compressionCodec.ordinal())
+      .build().writeDelimitedTo(baos);
+    output.write(baos.toByteArray());
+    // Pad to make up the difference between variable PB encoding length and the
+    // length when encoded as writable under earlier V2 formats. Failure to pad
+    // properly or if the PB encoding is too big would mean the trailer wont be read
+    // in properly by HFile.
+    int padding = getTrailerSize() - NOT_PB_SIZE - baos.size();
+    if (padding < 0) {
+      throw new IOException("Pbuf encoding size exceeded fixed trailer size limit");
+    }
+    for (int i = 0; i < padding; i++) {
+      output.write(0);
+    }
+  }
+
+  /**
+   * Write trailer data as writable
+   * @param outputStream
+   * @throws IOException
+   */
+  void serializeAsWritable(DataOutputStream output) throws IOException {
+    output.writeLong(fileInfoOffset);
+    output.writeLong(loadOnOpenDataOffset);
+    output.writeInt(dataIndexCount);
 
     if (majorVersion == 1) {
       // This used to be metaIndexOffset, but it was not used in version 1.
-      baosDos.writeLong(0);
+      output.writeLong(0);
     } else {
-      baosDos.writeLong(uncompressedDataIndexSize);
+      output.writeLong(uncompressedDataIndexSize);
     }
 
-    baosDos.writeInt(metaIndexCount);
-    baosDos.writeLong(totalUncompressedBytes);
+    output.writeInt(metaIndexCount);
+    output.writeLong(totalUncompressedBytes);
     if (majorVersion == 1) {
-      baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
+      output.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
     } else {
       // This field is long from version 2 onwards.
-      baosDos.writeLong(entryCount);
+      output.writeLong(entryCount);
     }
-    baosDos.writeInt(compressionCodec.ordinal());
+    output.writeInt(compressionCodec.ordinal());
 
     if (majorVersion > 1) {
-      baosDos.writeInt(numDataIndexLevels);
-      baosDos.writeLong(firstDataBlockOffset);
-      baosDos.writeLong(lastDataBlockOffset);
-      Bytes.writeStringFixedSize(baosDos, comparatorClassName,
-          MAX_COMPARATOR_NAME_LENGTH);
+      output.writeInt(numDataIndexLevels);
+      output.writeLong(firstDataBlockOffset);
+      output.writeLong(lastDataBlockOffset);
+      Bytes.writeStringFixedSize(output, comparatorClassName, MAX_COMPARATOR_NAME_LENGTH);
     }
-
-    // serialize the major and minor versions
-    baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
-
-    outputStream.write(baos.toByteArray());
   }
 
   /**
@@ -222,7 +272,6 @@ public class FixedFileTrailer {
    * {@link #serialize(DataOutputStream)}.
    *
    * @param inputStream
-   * @param version
    * @throws IOException
    */
   void deserialize(DataInputStream inputStream) throws IOException {
@@ -230,33 +279,100 @@ public class FixedFileTrailer {
 
     BlockType.TRAILER.readAndCheck(inputStream);
 
-    fileInfoOffset = inputStream.readLong();
-    loadOnOpenDataOffset = inputStream.readLong();
-    dataIndexCount = inputStream.readInt();
-
-    if (majorVersion == 1) {
-      inputStream.readLong(); // Read and skip metaIndexOffset.
+    if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+      deserializeFromPB(inputStream);
     } else {
-      uncompressedDataIndexSize = inputStream.readLong();
-    }
-    metaIndexCount = inputStream.readInt();
-
-    totalUncompressedBytes = inputStream.readLong();
-    entryCount = majorVersion == 1 ? inputStream.readInt() : inputStream.readLong();
-    compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
-    if (majorVersion > 1) {
-      numDataIndexLevels = inputStream.readInt();
-      firstDataBlockOffset = inputStream.readLong();
-      lastDataBlockOffset = inputStream.readLong();
-      comparatorClassName =
-          Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
+      deserializeFromWritable(inputStream);
     }
 
+    // The last 4 bytes of the file encode the major and minor version universally
     int version = inputStream.readInt();
     expectMajorVersion(extractMajorVersion(version));
     expectMinorVersion(extractMinorVersion(version));
   }
 
+  /**
+   * Deserialize the file trailer as protobuf
+   * @param inputStream
+   * @throws IOException
+   */
+  void deserializeFromPB(DataInputStream inputStream) throws IOException {
+    // read PB and skip padding
+    int start = inputStream.available();
+    HFileProtos.FileTrailerProto.Builder builder = HFileProtos.FileTrailerProto.newBuilder();
+    builder.mergeDelimitedFrom(inputStream);
+    int size = start - inputStream.available();
+    inputStream.skip(getTrailerSize() - NOT_PB_SIZE - size);
+
+    // process the PB
+    if (builder.hasFileInfoOffset()) {
+      fileInfoOffset = builder.getFileInfoOffset();
+    }
+    if (builder.hasLoadOnOpenDataOffset()) {
+      loadOnOpenDataOffset = builder.getLoadOnOpenDataOffset();
+    }
+    if (builder.hasUncompressedDataIndexSize()) {
+      uncompressedDataIndexSize = builder.getUncompressedDataIndexSize();
+    }
+    if (builder.hasTotalUncompressedBytes()) {
+      totalUncompressedBytes = builder.getTotalUncompressedBytes();
+    }
+    if (builder.hasDataIndexCount()) {
+      dataIndexCount = builder.getDataIndexCount();
+    }
+    if (builder.hasMetaIndexCount()) {
+      metaIndexCount = builder.getMetaIndexCount();
+    }
+    if (builder.hasEntryCount()) {
+      entryCount = builder.getEntryCount();
+    }
+    if (builder.hasNumDataIndexLevels()) {
+      numDataIndexLevels = builder.getNumDataIndexLevels();
+    }
+    if (builder.hasFirstDataBlockOffset()) {
+      firstDataBlockOffset = builder.getFirstDataBlockOffset();
+    }
+    if (builder.hasLastDataBlockOffset()) {
+      lastDataBlockOffset = builder.getLastDataBlockOffset();
+    }
+    if (builder.hasComparatorClassName()) {
+      setComparatorClass(getComparatorClass(builder.getComparatorClassName()));
+    }
+    if (builder.hasCompressionCodec()) {
+      compressionCodec = Compression.Algorithm.values()[builder.getCompressionCodec()];
+    } else {
+      compressionCodec = Compression.Algorithm.NONE;
+    }
+  }
+
+  /**
+   * Deserialize the file trailer as writable data
+   * @param input
+   * @throws IOException
+   */
+  void deserializeFromWritable(DataInput input) throws IOException {
+    fileInfoOffset = input.readLong();
+    loadOnOpenDataOffset = input.readLong();
+    dataIndexCount = input.readInt();
+    if (majorVersion == 1) {
+      input.readLong(); // Read and skip metaIndexOffset.
+    } else {
+      uncompressedDataIndexSize = input.readLong();
+    }
+    metaIndexCount = input.readInt();
+
+    totalUncompressedBytes = input.readLong();
+    entryCount = majorVersion == 1 ? input.readInt() : input.readLong();
+    compressionCodec = Compression.Algorithm.values()[input.readInt()];
+    if (majorVersion > 1) {
+      numDataIndexLevels = input.readInt();
+      firstDataBlockOffset = input.readLong();
+      lastDataBlockOffset = input.readLong();
+      setComparatorClass(getComparatorClass(Bytes.readStringFixedSize(input,
+        MAX_COMPARATOR_NAME_LENGTH)));
+    }
+  }
+  
   private void append(StringBuilder sb, String s) {
     if (sb.length() > 0)
       sb.append(", ");
@@ -450,6 +566,10 @@ public class FixedFileTrailer {
     this.firstDataBlockOffset = firstDataBlockOffset;
   }
 
+  public String getComparatorClassName() {
+    return comparatorClassName;
+  }
+
   /**
    * Returns the major version of this HFile format
    */
@@ -466,7 +586,13 @@ public class FixedFileTrailer {
 
   @SuppressWarnings("rawtypes")
   public void setComparatorClass(Class<? extends RawComparator> klass) {
-    expectAtLeastMajorVersion(2);
+    // Is the comparator instantiable
+    try {
+      klass.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Comparator class " + klass.getName() +
+        " is not instantiable", e);
+    }
     comparatorClassName = klass.getName();
   }
 
@@ -486,9 +612,11 @@ public class FixedFileTrailer {
     try {
       return getComparatorClass(comparatorClassName).newInstance();
     } catch (InstantiationException e) {
-      throw new IOException(e);
+      throw new IOException("Comparator class " + comparatorClassName +
+        " is not instantiable", e);
     } catch (IllegalAccessException e) {
-      throw new IOException(e);
+      throw new IOException("Comparator class " + comparatorClassName +
+        " is not instantiable", e);
     }
   }
 

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Feb 13 20:58:23 2013
@@ -165,6 +165,9 @@ public class HFile {
   public final static String DEFAULT_COMPRESSION =
     DEFAULT_COMPRESSION_ALGORITHM.getName();
 
+  /** Meta data block name for bloom filter bits. */
+  public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
+
   /**
    * We assume that HFile path ends with
    * ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
@@ -447,8 +450,6 @@ public class HFile {
       CacheConfig cacheConf) {
     int version = getFormatVersion(conf);
     switch (version) {
-    case 1:
-      return new HFileWriterV1.WriterFactoryV1(conf, cacheConf);
     case 2:
       return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
     default:
@@ -557,9 +558,6 @@ public class HFile {
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
     }
     switch (trailer.getMajorVersion()) {
-    case 1:
-      return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
-          cacheConf);
     case 2:
       return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
           size, closeIStream,

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Wed Feb 13 20:58:23 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -129,8 +130,9 @@ public class HFileBlock implements Cache
   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
 
-  static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
-      Bytes.SIZEOF_INT;
+  // minorVersion+offset+nextBlockOnDiskSizeWithHeader
+  public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
 
   /**
    * Each checksum value is an integer that can be stored in 4 bytes.
@@ -139,22 +141,39 @@ public class HFileBlock implements Cache
 
   private static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
-        public HFileBlock deserialize(ByteBuffer buf) throws IOException{
-          ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
-              - HFileBlock.EXTRA_SERIALIZATION_SPACE);
-          buf.limit(buf.limit()
-              - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
-          newByteBuffer.put(buf);
-          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, 
-                                   MINOR_VERSION_NO_CHECKSUM);
-
+        public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+          buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+          ByteBuffer newByteBuffer;
+          if (reuse) {
+            newByteBuffer = buf.slice();
+          } else {
+           newByteBuffer = ByteBuffer.allocate(buf.limit());
+           newByteBuffer.put(buf);
+          }
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+          int minorVersion=buf.getInt();
+          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
           ourBuffer.offset = buf.getLong();
           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
           return ourBuffer;
         }
+        
+        @Override
+        public int getDeserialiserIdentifier() {
+          return deserializerIdentifier;
+        }
+
+        @Override
+        public HFileBlock deserialize(ByteBuffer b) throws IOException {
+          return deserialize(b, false);
+        }
       };
+  private static final int deserializerIdentifier;
+  static {
+    deserializerIdentifier = CacheableDeserializerIdManager
+        .registerDeserializer(blockDeserializer);
+  }
 
   private BlockType blockType;
 
@@ -359,6 +378,17 @@ public class HFileBlock implements Cache
   }
 
   /**
+   * Returns the buffer of this block, including header data. The clients must
+   * not modify the buffer object. This method has to be public because it is
+   * used in {@link BucketCache} to avoid buffer copy.
+   * 
+   * @return the byte buffer with header included for read-only operations
+   */
+  public ByteBuffer getBufferReadOnlyWithHeader() {
+    return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+  }
+
+  /**
    * Returns a byte buffer of this block, including header data, positioned at
    * the beginning of header. The underlying data array is not copied.
    *
@@ -1287,110 +1317,6 @@ public class HFileBlock implements Cache
   }
 
   /**
-   * Reads version 1 blocks from the file system. In version 1 blocks,
-   * everything is compressed, including the magic record, if compression is
-   * enabled. Everything might be uncompressed if no compression is used. This
-   * reader returns blocks represented in the uniform version 2 format in
-   * memory.
-   */
-  static class FSReaderV1 extends AbstractFSReader {
-
-    /** Header size difference between version 1 and 2 */
-    private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM - 
-                                            MAGIC_LENGTH;
-
-    public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize) throws IOException {
-      super(istream, istream, compressAlgo, fileSize, 0, null, null);
-    }
-
-    /**
-     * Read a version 1 block. There is no uncompressed header, and the block
-     * type (the magic record) is part of the compressed data. This
-     * implementation assumes that the bounded range file input stream is
-     * needed to stop the decompressor reading into next block, because the
-     * decompressor just grabs a bunch of data without regard to whether it is
-     * coming to end of the compressed section.
-     *
-     * The block returned is still a version 2 block, and in particular, its
-     * first {@link #HEADER_SIZE} bytes contain a valid version 2 header.
-     *
-     * @param offset the offset of the block to read in the file
-     * @param onDiskSizeWithMagic the on-disk size of the version 1 block,
-     *          including the magic record, which is the part of compressed
-     *          data if using compression
-     * @param uncompressedSizeWithMagic uncompressed size of the version 1
-     *          block, including the magic record
-     */
-    @Override
-    public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
-        int uncompressedSizeWithMagic, boolean pread) throws IOException {
-      if (uncompressedSizeWithMagic <= 0) {
-        throw new IOException("Invalid uncompressedSize="
-            + uncompressedSizeWithMagic + " for a version 1 block");
-      }
-
-      if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
-      {
-        throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
-            + " (maximum allowed: " + Integer.MAX_VALUE + ")");
-      }
-
-      int onDiskSize = (int) onDiskSizeWithMagic;
-
-      if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
-        throw new IOException("Uncompressed size for a version 1 block is "
-            + uncompressedSizeWithMagic + " but must be at least "
-            + MAGIC_LENGTH);
-      }
-
-      // The existing size already includes magic size, and we are inserting
-      // a version 2 header.
-      ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
-          + HEADER_DELTA);
-
-      int onDiskSizeWithoutHeader;
-      if (compressAlgo == Compression.Algorithm.NONE) {
-        // A special case when there is no compression.
-        if (onDiskSize != uncompressedSizeWithMagic) {
-          throw new IOException("onDiskSize=" + onDiskSize
-              + " and uncompressedSize=" + uncompressedSizeWithMagic
-              + " must be equal for version 1 with no compression");
-        }
-
-        // The first MAGIC_LENGTH bytes of what this will read will be
-        // overwritten.
-        readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
-            onDiskSize, false, offset, pread);
-
-        onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
-      } else {
-        InputStream bufferedBoundedStream = createBufferedBoundedStream(
-            offset, onDiskSize, pread);
-        Compression.decompress(buf.array(), buf.arrayOffset()
-            + HEADER_DELTA, bufferedBoundedStream, onDiskSize,
-            uncompressedSizeWithMagic, this.compressAlgo);
-
-        // We don't really have a good way to exclude the "magic record" size
-        // from the compressed block's size, since it is compressed as well.
-        onDiskSizeWithoutHeader = onDiskSize;
-      }
-
-      BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
-          + HEADER_DELTA, MAGIC_LENGTH);
-
-      // We set the uncompressed size of the new HFile block we are creating
-      // to the size of the data portion of the block without the magic record,
-      // since the magic record gets moved to the header.
-      HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
-          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
-          offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
-          onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
-      return b;
-    }
-  }
-
-  /**
    * We always prefetch the header of the next block, so that we know its
    * on-disk size in advance and can read it in one operation.
    */
@@ -1780,7 +1706,17 @@ public class HFileBlock implements Cache
 
   @Override
   public void serialize(ByteBuffer destination) {
-    destination.put(this.buf.duplicate());
+    ByteBuffer dupBuf = this.buf.duplicate();
+    dupBuf.rewind();
+    destination.put(dupBuf);
+    destination.putInt(this.minorVersion);
+    destination.putLong(this.offset);
+    destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+    destination.rewind();
+  }
+
+  public void serializeExtraInfo(ByteBuffer destination) {
+    destination.putInt(this.minorVersion);
     destination.putLong(this.offset);
     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
     destination.rewind();

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Wed Feb 13 20:58:23 2013
@@ -77,8 +77,8 @@ public class HFileReaderV2 extends Abstr
   static final int MIN_MINOR_VERSION = 0;
 
   /** Maximum minor version supported by this HFile format */
-  // We went to version 2 when we moved to pb'ing the fileinfo trailer on the file. This version can read Writables
-  // version 1 too.
+  // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
+  // the file. This version can read Writables version 1.
   static final int MAX_MINOR_VERSION = 2;
 
   /**

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Feb 13 20:58:23 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -173,6 +175,9 @@ public class LruBlockCache implements Bl
   /** Overhead of the structure itself */
   private long overhead;
 
+  /** Where to send victims (blocks evicted from the cache) */
+  private BucketCache victimHandler = null;
+
   /**
    * Default constructor.  Specify maximum size and expected average block
    * size (approximation is fine).
@@ -342,6 +347,8 @@ public class LruBlockCache implements Bl
     CachedBlock cb = map.get(cacheKey);
     if(cb == null) {
       if (!repeat) stats.miss(caching);
+      if (victimHandler != null)
+        return victimHandler.getBlock(cacheKey, caching, repeat);
       return null;
     }
     stats.hit(caching);
@@ -349,12 +356,20 @@ public class LruBlockCache implements Bl
     return cb.getBuffer();
   }
 
+  /**
+   * Whether the cache contains block with specified cacheKey
+   * @param cacheKey
+   * @return true if contains the block
+   */
+  public boolean containsBlock(BlockCacheKey cacheKey) {
+    return map.containsKey(cacheKey);
+  }
 
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
     CachedBlock cb = map.get(cacheKey);
     if (cb == null) return false;
-    evictBlock(cb);
+    evictBlock(cb, false);
     return true;
   }
 
@@ -377,14 +392,31 @@ public class LruBlockCache implements Bl
           ++numEvicted;
       }
     }
+    if (victimHandler != null) {
+      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+    }
     return numEvicted;
   }
 
-  protected long evictBlock(CachedBlock block) {
+  /**
+   * Evict the block, and it will be cached by the victim handler if exists &&
+   * block may be read again later
+   * @param block
+   * @param evictedByEvictionProcess true if the given block is evicted by
+   *          EvictionThread
+   * @return the heap size of evicted block
+   */
+  protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
     map.remove(block.getCacheKey());
     updateSizeMetrics(block, true);
     elements.decrementAndGet();
     stats.evicted();
+    if (evictedByEvictionProcess && victimHandler != null) {
+      boolean wait = getCurrentSize() < acceptableSize();
+      boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+      victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
+          inMemory, wait);
+    }
     return block.heapSize();
   }
 
@@ -512,7 +544,7 @@ public class LruBlockCache implements Bl
       CachedBlock cb;
       long freedBytes = 0;
       while ((cb = queue.pollLast()) != null) {
-        freedBytes += evictBlock(cb);
+        freedBytes += evictBlock(cb, true);
         if (freedBytes >= toFree) {
           return freedBytes;
         }
@@ -532,6 +564,16 @@ public class LruBlockCache implements Bl
       if(this.overflow() == that.overflow()) return 0;
       return this.overflow() > that.overflow() ? 1 : -1;
     }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null || !(that instanceof BlockBucket)){
+        return false;
+      }
+
+      return compareTo(( BlockBucket)that) == 0;
+    }
+
   }
 
   /**
@@ -625,13 +667,13 @@ public class LruBlockCache implements Bl
 
     public void evict() {
       synchronized(this) {
-        this.notify(); // FindBugs NN_NAKED_NOTIFY
+        this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
       }
     }
 
-    void shutdown() {
+    synchronized void shutdown() {
       this.go = false;
-      interrupt();
+      this.notifyAll();
     }
 
     /**
@@ -693,7 +735,7 @@ public class LruBlockCache implements Bl
   }
 
   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
-      (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+      (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
       + ClassSize.OBJECT);
 
@@ -762,6 +804,8 @@ public class LruBlockCache implements Bl
   }
 
   public void shutdown() {
+    if (victimHandler != null)
+      victimHandler.shutdown();
     this.scheduleThreadPool.shutdown();
     for (int i = 0; i < 10; i++) {
       if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@@ -812,4 +856,9 @@ public class LruBlockCache implements Bl
     return counts;
   }
 
+  public void setVictimCache(BucketCache handler) {
+    assert victimHandler == null;
+    victimHandler = handler;
+  }
+
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java Wed Feb 13 20:58:23 2013
@@ -70,4 +70,4 @@ public class BlockingRpcCallback<R> impl
     }
     return result;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java Wed Feb 13 20:58:23 2013
@@ -25,11 +25,8 @@ import java.io.IOException;
  * but is only used for logging on the server side, etc.
  */
 public class CallerDisconnectedException extends IOException {
+  private static final long serialVersionUID = 1L;
   public CallerDisconnectedException(String msg) {
     super(msg);
   }
-
-  private static final long serialVersionUID = 1L;
-
-  
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Wed Feb 13 20:58:23 2013
@@ -70,4 +70,4 @@ public interface Delayable {
    * @throws IOException
    */
   public void endDelayThrowing(Throwable t) throws IOException;
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Feb 13 20:58:23 2013
@@ -28,6 +28,7 @@ import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -41,6 +42,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -53,6 +55,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
@@ -103,9 +106,11 @@ import com.google.protobuf.Message.Build
 @InterfaceAudience.Private
 public class HBaseClient {
 
-  public static final Log LOG = LogFactory
-      .getLog("org.apache.hadoop.ipc.HBaseClient");
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
   protected final PoolMap<ConnectionId, Connection> connections;
+  private static final Map<String, Method> methodInstances =
+      new ConcurrentHashMap<String, Method>();
 
   protected int counter;                            // counter for call ids
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -121,7 +126,6 @@ public class HBaseClient {
   protected FailedServers failedServers;
 
   protected final SocketFactory socketFactory;           // how to create sockets
-  private int refCount = 1;
   protected String clusterId;
 
   final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -186,12 +190,13 @@ public class HBaseClient {
   }
 
   public static class FailedServerException extends IOException {
+    private static final long serialVersionUID = -4744376109431464127L;
+
     public FailedServerException(String s) {
       super(s);
     }
   }
 
-
   /**
    * set the ping interval value in configuration
    *
@@ -229,36 +234,11 @@ public class HBaseClient {
     return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
   }
 
-  /**
-   * Increment this client's reference count
-   *
-   */
-  synchronized void incCount() {
-    refCount++;
-  }
-
-  /**
-   * Decrement this client's reference count
-   *
-   */
-  synchronized void decCount() {
-    refCount--;
-  }
-
-  /**
-   * Return if this client has no reference
-   *
-   * @return true if this client has no reference; false otherwise
-   */
-  synchronized boolean isZeroReference() {
-    return refCount==0;
-  }
-
   /** A call waiting for a value. */
   protected class Call {
-    final int id;                                       // call id
-    final RpcRequestBody param;                         // rpc request object
-    Message value;                               // value, null if error
+    final int id;                                 // call id
+    final RpcRequestBody param;                   // rpc request object
+    Message value;                                // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
     long startTime;
@@ -302,6 +282,7 @@ public class HBaseClient {
       return this.startTime;
     }
   }
+
   protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
       new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
   static {
@@ -335,9 +316,12 @@ public class HBaseClient {
     private int reloginMaxBackoff; // max pause before relogin on sasl failure
 
     // currently active calls
-    protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
-    protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
-    protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
+    protected final ConcurrentSkipListMap<Integer, Call> calls =
+      new ConcurrentSkipListMap<Integer, Call>();
+    protected final AtomicLong lastActivity =
+      new AtomicLong(); // last I/O activity time
+    protected final AtomicBoolean shouldCloseConnection =
+      new AtomicBoolean();  // indicate if the connection is closed
     protected IOException closeException; // close reason
 
     Connection(ConnectionId remoteId) throws IOException {
@@ -414,16 +398,14 @@ public class HBaseClient {
         return null;
       }
       UserInformation.Builder userInfoPB = UserInformation.newBuilder();
-      if (ugi != null) {
-        if (authMethod == AuthMethod.KERBEROS) {
-          // Send effective user for Kerberos auth
-          userInfoPB.setEffectiveUser(ugi.getUserName());
-        } else if (authMethod == AuthMethod.SIMPLE) {
-          //Send both effective user and real user for simple auth
-          userInfoPB.setEffectiveUser(ugi.getUserName());
-          if (ugi.getRealUser() != null) {
-            userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-          }
+      if (authMethod == AuthMethod.KERBEROS) {
+        // Send effective user for Kerberos auth
+        userInfoPB.setEffectiveUser(ugi.getUserName());
+      } else if (authMethod == AuthMethod.SIMPLE) {
+        //Send both effective user and real user for simple auth
+        userInfoPB.setEffectiveUser(ugi.getUserName());
+        if (ugi.getRealUser() != null) {
+          userInfoPB.setRealUser(ugi.getRealUser().getUserName());
         }
       }
       return userInfoPB.build();
@@ -845,11 +827,17 @@ public class HBaseClient {
           start();
           return;
         }
-      } catch (IOException e) {
+      } catch (Throwable t) {
         failedServers.addToFailedServers(remoteId.address);
-        markClosed(e);
+        IOException e = null;
+        if (t instanceof IOException) {
+          e = (IOException)t;
+          markClosed(e);
+        } else {
+          e = new IOException("Coundn't set up IO Streams", t);
+          markClosed(e);
+        }
         close();
-
         throw e;
       }
     }
@@ -959,6 +947,24 @@ public class HBaseClient {
       }
     }
 
+
+    private Method getMethod(Class<? extends IpcProtocol> protocol,
+        String methodName) {
+      Method method = methodInstances.get(methodName);
+      if (method != null) {
+        return method;
+      }
+      Method[] methods = protocol.getMethods();
+      for (Method m : methods) {
+        if (m.getName().equals(methodName)) {
+          m.setAccessible(true);
+          methodInstances.put(methodName, m);
+          return m;
+        }
+      }
+      return null;
+    }
+
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
      */
@@ -990,9 +996,9 @@ public class HBaseClient {
         if (status == Status.SUCCESS) {
           Message rpcResponseType;
           try {
-            rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
-                ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
-                    call.param.getMethodName()));
+            rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
+                getMethod(remoteId.getProtocol(),
+                          call.param.getMethodName()));
           } catch (Exception e) {
             throw new RuntimeException(e); //local exception
           }
@@ -1270,7 +1276,7 @@ public class HBaseClient {
    * Throws exceptions if there are network problems or if the remote code
    * threw an exception. */
   public Message call(RpcRequestBody param, InetSocketAddress addr,
-                       Class<? extends VersionedProtocol> protocol,
+                       Class<? extends IpcProtocol> protocol,
                        User ticket, int rpcTimeout)
       throws InterruptedException, IOException {
     Call call = new Call(param);
@@ -1317,7 +1323,6 @@ public class HBaseClient {
    * @param exception the relevant exception
    * @return an exception to throw
    */
-  @SuppressWarnings({"ThrowableInstanceNeverThrown"})
   protected IOException wrapException(InetSocketAddress addr,
                                          IOException exception) {
     if (exception instanceof ConnectException) {
@@ -1340,25 +1345,9 @@ public class HBaseClient {
   /** Makes a set of calls in parallel.  Each parameter is sent to the
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
-   * contains nulls for calls that timed out or errored.
-   * @param params RpcRequestBody parameters
-   * @param addresses socket addresses
-   * @return  RpcResponseBody[]
-   * @throws IOException e
-   * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
-   */
-  @Deprecated
-  public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
-    throws IOException, InterruptedException {
-    return call(params, addresses, null, null);
-  }
-
-  /** Makes a set of calls in parallel.  Each parameter is sent to the
-   * corresponding address.  When all values are available, or have timed out
-   * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.  */
   public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
-                         Class<? extends VersionedProtocol> protocol,
+                         Class<? extends IpcProtocol> protocol,
                          User ticket)
       throws IOException, InterruptedException {
     if (addresses.length == 0) return new RpcResponseBody[0];
@@ -1393,7 +1382,7 @@ public class HBaseClient {
   /* Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
   protected Connection getConnection(InetSocketAddress addr,
-                                   Class<? extends VersionedProtocol> protocol,
+                                   Class<? extends IpcProtocol> protocol,
                                    User ticket,
                                    int rpcTimeout,
                                    Call call)
@@ -1436,11 +1425,10 @@ public class HBaseClient {
     final InetSocketAddress address;
     final User ticket;
     final int rpcTimeout;
-    Class<? extends VersionedProtocol> protocol;
+    Class<? extends IpcProtocol> protocol;
     private static final int PRIME = 16777619;
 
-    ConnectionId(InetSocketAddress address,
-        Class<? extends VersionedProtocol> protocol,
+    ConnectionId(InetSocketAddress address, Class<? extends IpcProtocol> protocol,
         User ticket,
         int rpcTimeout) {
       this.protocol = protocol;
@@ -1453,7 +1441,7 @@ public class HBaseClient {
       return address;
     }
 
-    Class<? extends VersionedProtocol> getProtocol() {
+    Class<? extends IpcProtocol> getProtocol() {
       return protocol;
     }
 
@@ -1479,4 +1467,4 @@ public class HBaseClient {
              (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Feb 13 20:58:23 2013
@@ -46,18 +46,17 @@ import java.nio.channels.WritableByteCha
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
@@ -68,28 +67,29 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -97,20 +97,20 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-
 import org.cliffc.high_scale_lib.Counter;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceInfo;
 import org.cloudera.htrace.impl.NullSpan;
-import org.cloudera.htrace.Trace;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+// Uses Writables doing sasl
 
 /** A client for an IPC service.  IPC calls take a single Protobuf message as a
  * parameter, and return a single Protobuf message as their value.  A service runs on
@@ -169,22 +169,18 @@ public abstract class HBaseServer implem
     new ThreadLocal<RpcServer>();
   private volatile boolean started = false;
 
-  // For generated protocol classes which doesn't have VERSION field
-  private static final Map<Class<?>, Long>
-    PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
-
-  private static final Map<String, Class<? extends VersionedProtocol>>
-      PROTOCOL_CACHE =
-      new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();
+  private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
+      new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
 
-  static Class<? extends VersionedProtocol> getProtocolClass(
+  @SuppressWarnings("unchecked")
+  static Class<? extends IpcProtocol> getProtocolClass(
       String protocolName, Configuration conf)
   throws ClassNotFoundException {
-    Class<? extends VersionedProtocol> protocol =
+    Class<? extends IpcProtocol> protocol =
         PROTOCOL_CACHE.get(protocolName);
 
     if (protocol == null) {
-      protocol = (Class<? extends VersionedProtocol>)
+      protocol = (Class<? extends IpcProtocol>)
           conf.getClassByName(protocolName);
       PROTOCOL_CACHE.put(protocolName, protocol);
     }
@@ -192,7 +188,7 @@ public abstract class HBaseServer implem
   }
 
   /** Returns the server instance called under or null.  May be called under
-   * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+   * {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
    * and under protobuf methods of parameters and return values.
    * Permits applications to access the server context.
    * @return HBaseServer
@@ -263,8 +259,6 @@ public abstract class HBaseServer implem
 
   protected int highPriorityLevel;  // what level a high priority call is at
 
-  private volatile int responseQueueLen; // size of response queue for this server
-
   protected final List<Connection> connectionList =
     Collections.synchronizedList(new LinkedList<Connection>());
   //maintain a list
@@ -278,7 +272,7 @@ public abstract class HBaseServer implem
   protected BlockingQueue<Call> replicationQueue;
   private int numOfReplicationHandlers = 0;
   private Handler[] replicationHandlers = null;
-  
+
   protected HBaseRPCErrorHandler errorHandler = null;
 
   /**
@@ -358,7 +352,7 @@ public abstract class HBaseServer implem
       if (errorClass != null) {
         this.isError = true;
       }
- 
+
       ByteBufferOutputStream buf = null;
       if (value != null) {
         buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
@@ -460,7 +454,7 @@ public abstract class HBaseServer implem
     public synchronized boolean isReturnValueDelayed() {
       return this.delayReturnValue;
     }
-    
+
     @Override
     public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
       if (!connection.channel.isOpen()) {
@@ -1000,7 +994,6 @@ public abstract class HBaseServer implem
             return true;
           }
           if (!call.response.hasRemaining()) {
-            responseQueueLen--;
             call.connection.decRpcCount();
             //noinspection RedundantIfStatement
             if (numElements == 1) {    // last call fully processes.
@@ -1070,7 +1063,6 @@ public abstract class HBaseServer implem
     void doRespond(Call call) throws IOException {
       // set the serve time when the response has to be sent later
       call.timestamp = System.currentTimeMillis();
-      responseQueueLen++;
 
       boolean doRegister = false;
       synchronized (call.connection.responseQueue) {
@@ -1120,7 +1112,7 @@ public abstract class HBaseServer implem
     protected String hostAddress;
     protected int remotePort;
     ConnectionHeader header;
-    Class<? extends VersionedProtocol> protocol;
+    Class<? extends IpcProtocol> protocol;
     protected UserGroupInformation user = null;
     private AuthMethod authMethod;
     private boolean saslContextEstablished;
@@ -1324,7 +1316,7 @@ public abstract class HBaseServer implem
             LOG.debug("SASL server context established. Authenticated client: "
               + user + ". Negotiated QoP is "
               + saslServer.getNegotiatedProperty(Sasl.QOP));
-          }          
+          }
           metrics.authenticationSuccess();
           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
           saslContextEstablished = true;
@@ -1437,7 +1429,7 @@ public abstract class HBaseServer implem
             }
           }
           if (dataLength < 0) {
-            throw new IllegalArgumentException("Unexpected data length " 
+            throw new IllegalArgumentException("Unexpected data length "
                 + dataLength + "!! from " + getHostAddress());
           }
           data = ByteBuffer.allocate(dataLength);
@@ -1758,7 +1750,7 @@ public abstract class HBaseServer implem
           status.pause("Waiting for a call");
           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
           status.setStatus("Setting up call");
-          status.setConnection(call.connection.getHostAddress(), 
+          status.setConnection(call.connection.getHostAddress(),
               call.connection.getRemotePort());
 
           if (LOG.isDebugEnabled())
@@ -2019,11 +2011,12 @@ public abstract class HBaseServer implem
     }
     return handlers;
   }
-  
+
   public SecretManager<? extends TokenIdentifier> getSecretManager() {
     return this.secretManager;
   }
 
+  @SuppressWarnings("unchecked")
   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
   }
@@ -2051,7 +2044,7 @@ public abstract class HBaseServer implem
       }
     }
   }
-  
+
   /** Wait for the server to be stopped.
    * Does not wait for all subthreads to finish.
    *  See {@link #stop()}.
@@ -2110,7 +2103,7 @@ public abstract class HBaseServer implem
                                          connection.getProtocol());
       }
       authManager.authorize(user != null ? user : null,
-          protocol, getConf(), addr);
+        protocol, getConf(), addr);
     }
   }
 

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java Wed Feb 13 20:58:23 2013
@@ -18,26 +18,20 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 
-import java.io.IOException;
-
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
 
 /**
  * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s



Mime
View raw message