hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject svn commit: r1524949 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/
Date Fri, 20 Sep 2013 08:51:38 GMT
Author: anoopsamjohn
Date: Fri Sep 20 08:51:37 2013
New Revision: 1524949

URL: http://svn.apache.org/r1524949
Log:
HBASE-9244 Add CP hooks around StoreFileReader creation

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1524949&r1=1524948&r2=1524949&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
Fri Sep 20 08:51:37 2013
@@ -22,6 +22,8 @@ import java.util.NavigableSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -38,6 +40,10 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -46,6 +52,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -455,4 +462,18 @@ public abstract class BaseRegionObserver
     List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException
{
     return hasLoaded;
   }
+
+  @Override
+  public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException
{
+    return reader;
+  }
+
+  @Override
+  public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException
{
+    return reader;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1524949&r1=1524948&r2=1524949&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
Fri Sep 20 08:51:37 2013
@@ -21,6 +21,8 @@ import java.util.NavigableSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -36,6 +38,10 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -1002,4 +1008,47 @@ public interface RegionObserver extends 
    */
   boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
     List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
+
+  /**
+   * Called before creation of Reader for a store file.
+   * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+   * effect in this hook.
+   * 
+   * @param ctx the environment provided by the region server
+   * @param fs fileystem to read from
+   * @param p path to the file
+   * @param in {@link FSDataInputStreamWrapper}
+   * @param size Full size of the file
+   * @param cacheConf
+   * @param preferredEncodingInCache
+   * @param r original reference file. This will be not null only when reading a split file.
+   * @param reader the base reader, if not {@code null}, from previous RegionObserver in
the chain
+   * @return a Reader instance to use instead of the base reader if overriding
+   * default behavior, null otherwise
+   * @throws IOException
+   */
+  StoreFile.Reader preStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
+      final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache,
+      final Reference r, StoreFile.Reader reader) throws IOException;
+
+  /**
+   * Called after the creation of Reader for a store file.
+   * 
+   * @param ctx the environment provided by the region server
+   * @param fs fileystem to read from
+   * @param p path to the file
+   * @param in {@link FSDataInputStreamWrapper}
+   * @param size Full size of the file
+   * @param cacheConf
+   * @param preferredEncodingInCache
+   * @param r original reference file. This will be not null only when reading a split file.
+   * @param reader the base reader instance
+   * @return The reader to use
+   * @throws IOException
+   */
+  StoreFile.Reader postStoreFileReaderOpen(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size,
+      final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache,
+      final Reference r, StoreFile.Reader reader) throws IOException;
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1524949&r1=1524948&r2=1524949&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
Fri Sep 20 08:51:37 2013
@@ -474,7 +474,9 @@ public class HStore implements Store {
   }
 
   private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder)
throws IOException {
-    StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf,
+    StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
+    info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
+    StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), encoder);
     storeFile.createReader();
     return storeFile;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1524949&r1=1524948&r2=1524949&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
Fri Sep 20 08:51:37 2013
@@ -33,6 +33,7 @@ import org.apache.commons.collections.ma
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -56,7 +57,11 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -1636,4 +1641,71 @@ public class RegionCoprocessorHost
     return hasLoaded;
   }
 
+  /**
+   * @param fs fileystem to read from
+   * @param p path to the file
+   * @param in {@link FSDataInputStreamWrapper}
+   * @param size Full size of the file
+   * @param cacheConf
+   * @param preferredEncodingInCache
+   * @param r original reference file. This will be not null only when reading a split file.
+   * @return a Reader instance to use instead of the base reader if overriding
+   * default behavior, null otherwise
+   * @throws IOException
+   */
+  public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
+      final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf,
+      final DataBlockEncoding preferredEncodingInCache, final Reference r) throws IOException
{
+    StoreFile.Reader reader = null;
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env : coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          reader = ((RegionObserver) env.getInstance()).preStoreFileReaderOpen(ctx, fs, p,
in,
+              size, cacheConf, preferredEncodingInCache, r, reader);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+    return reader;
+  }
+
+  /**
+   * @param fs fileystem to read from
+   * @param p path to the file
+   * @param in {@link FSDataInputStreamWrapper}
+   * @param size Full size of the file
+   * @param cacheConf
+   * @param preferredEncodingInCache
+   * @param r original reference file. This will be not null only when reading a split file.
+   * @param reader the base reader instance
+   * @return The reader to use
+   * @throws IOException
+   */
+  public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
+      final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf,
+      final DataBlockEncoding preferredEncodingInCache, final Reference r, StoreFile.Reader
reader)
+      throws IOException {
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env : coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          reader = ((RegionObserver) env.getInstance()).postStoreFileReaderOpen(ctx, fs,
p, in,
+              size, cacheConf, preferredEncodingInCache, r, reader);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+    return reader;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java?rev=1524949&r1=1524948&r2=1524949&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
Fri Sep 20 08:51:37 2013
@@ -79,6 +79,8 @@ public class StoreFileInfo {
   // FileSystem information for the file.
   private final FileStatus fileStatus;
 
+  private RegionCoprocessorHost coprocessorHost;
+
   /**
    * Create a Store File Info
    * @param conf the {@link Configuration} to use
@@ -126,6 +128,14 @@ public class StoreFileInfo {
     }
   }
 
+  /**
+   * Sets the region coprocessor env.
+   * @param coprocessorHost
+   */
+  public void setRegionCoprocessorHost(RegionCoprocessorHost coprocessorHost) {
+    this.coprocessorHost = coprocessorHost;
+  }
+
   /*
    * @return the Reference object associated to this StoreFileInfo.
    *         null if the StoreFile is not a reference.
@@ -182,12 +192,27 @@ public class StoreFileInfo {
     long length = status.getLen();
     if (this.reference != null) {
       hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
-      return new HalfStoreFileReader(
-          fs, this.getPath(), in, length, cacheConf, reference, dataBlockEncoding);
     } else {
       hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
-      return new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
     }
+    StoreFile.Reader reader = null;
+    if (this.coprocessorHost != null) {
+      reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
+          cacheConf, dataBlockEncoding, reference);
+    }
+    if (reader == null) {
+      if (this.reference != null) {
+        reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
+            dataBlockEncoding);
+      } else {
+        reader = new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
+      }
+    }
+    if (this.coprocessorHost != null) {
+      reader = this.coprocessorHost.postStoreFileReaderOpen(fs, this.getPath(), in, length,
+          cacheConf, dataBlockEncoding, reference, reader);
+    }
+    return reader;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1524949&r1=1524948&r2=1524949&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
Fri Sep 20 08:51:37 2013
@@ -34,6 +34,8 @@ import java.util.concurrent.atomic.Atomi
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -47,6 +49,10 @@ import org.apache.hadoop.hbase.client.In
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -56,6 +62,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -110,7 +117,8 @@ public class SimpleRegionObserver extend
   final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
   final AtomicInteger ctPreSplitBeforePONR = new AtomicInteger(0);
   final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0);
-
+  final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
+  final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
 
   final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
 
@@ -544,6 +552,21 @@ public class SimpleRegionObserver extend
     ctPostWALRestore.incrementAndGet();
   }
 
+  @Override
+  public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException
{
+    ctPreStoreFileReaderOpen.incrementAndGet();
+    return null;
+  }
+
+  @Override
+  public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException
{
+    ctPostStoreFileReaderOpen.incrementAndGet();
+    return reader;
+  }
 
   public boolean hadPreGet() {
     return ctPreGet.get() > 0;
@@ -725,4 +748,8 @@ public class SimpleRegionObserver extend
   public int getCtPostWALRestore() {
     return ctPostWALRestore.get();
   }
+
+  public boolean wasStoreFileReaderOpenCalled() {
+    return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get()
> 0;
+  }
 }



Mime
View raw message