hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r1353060 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/
Date Sat, 23 Jun 2012 01:54:13 GMT
Author: apurtell
Date: Sat Jun 23 01:54:12 2012
New Revision: 1353060

URL: http://svn.apache.org/viewvc?rev=1353060&view=rev
Log:
HBASE-6224. Add pre and post coprocessor hooks for bulk load (Francis Liu)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
Sat Jun 23 01:54:12 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Pair;
 
 import java.io.IOException;
 
@@ -274,4 +275,15 @@ public abstract class BaseRegionObserver
   public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
       HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
   }
+
+  @Override
+  public void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+    List<Pair<byte[], String>> familyPaths) throws IOException {
+  }
+
+  @Override
+  public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException
{
+    return hasLoaded;
+  }
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
Sat Jun 23 01:54:12 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Coprocessors implement this interface to observe and mediate client actions
@@ -651,4 +652,28 @@ public interface RegionObserver extends 
    */
   void postWALRestore(final ObserverContext<RegionCoprocessorEnvironment> ctx,
       HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException;
+
+  /**
+   * Called before bulkLoadHFile. Users can create a StoreFile instance to
+   * access the contents of a HFile.
+   *
+   * @param ctx
+   * @param familyPaths pairs of { CF, HFile path } submitted for bulk load. Adding
+   * or removing from this list will add or remove HFiles to be bulk loaded.
+   * @throws IOException
+   */
+  void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths) throws IOException;
+
+  /**
+   * Called after bulkLoadHFile.
+   *
+   * @param ctx
+   * @param familyPaths pairs of { CF, HFile path } submitted for bulk load
+   * @param hasLoaded whether the bulkLoad was successful
+   * @return the new value of hasLoaded
+   * @throws IOException
+   */
+  boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+    List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat Jun 23 01:54:12 2012
@@ -2662,7 +2662,18 @@ public class HRegionServer implements HR
       byte[] regionName) throws IOException {
     checkOpen();
     HRegion region = getRegion(regionName);
-    return region.bulkLoadHFiles(familyPaths);
+    boolean bypass = false;
+    if (region.getCoprocessorHost() != null) {
+      bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+    }
+    boolean loaded = false;
+    if (!bypass) {
+      loaded = region.bulkLoadHFiles(familyPaths);
+    }
+    if (region.getCoprocessorHost() != null) {
+      loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+    }
+    return loaded;
   }
 
   Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
Sat Jun 23 01:54:12 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ipc.Copro
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
@@ -1267,4 +1268,58 @@ public class RegionCoprocessorHost
       }
     }
   }
+
+  /**
+   * @param familyPaths pairs of { CF, file path } submitted for bulk load
+   * @return true if the default operation should be bypassed
+   * @throws IOException
+   */
+  public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws
IOException {
+    boolean bypass = false;
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env: coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        bypass |= ctx.shouldBypass();
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+
+    return bypass;
+  }
+
+  /**
+   * @param familyPaths pairs of { CF, file path } submitted for bulk load
+   * @param hasLoaded whether load was successful or not
+   * @return the possibly modified value of hasLoaded
+   * @throws IOException
+   */
+  public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean
hasLoaded)
+      throws IOException {
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env: coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
+            familyPaths, hasLoaded);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+
+    return hasLoaded;
+  }
+
 }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Sat Jun 23 01:54:12 2012
@@ -231,7 +231,7 @@ public class StoreFile extends SchemaCon
    * @param dataBlockEncoder data block encoding algorithm.
    * @throws IOException When opening the reader fails.
    */
-  StoreFile(final FileSystem fs,
+  public StoreFile(final FileSystem fs,
             final Path p,
             final Configuration conf,
             final CacheConfig cacheConf,

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
Sat Jun 23 01:54:12 2012
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.regionser
  * KeyValueScanner adaptor over the Reader.  It also provides hooks into
  * bloom filter things.
  */
-class StoreFileScanner implements KeyValueScanner {
+public class StoreFileScanner implements KeyValueScanner {
   static final Log LOG = LogFactory.getLog(Store.class);
 
   // the reader it comes from:

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
Sat Jun 23 01:54:12 2012
@@ -20,6 +20,8 @@
 
 package org.apache.hadoop.hbase.coprocessor;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A sample region observer that tests the RegionObserver interface.
@@ -85,6 +88,8 @@ public class SimpleRegionObserver extend
   boolean hadPostScannerClose = false;
   boolean hadPreScannerOpen = false;
   boolean hadPostScannerOpen = false;
+  boolean hadPreBulkLoadHFile = false;
+  boolean hadPostBulkLoadHFile = false;
 
   @Override
   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -384,6 +389,43 @@ public class SimpleRegionObserver extend
     return result;
   }
 
+  @Override
+  public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                               List<Pair<byte[], String>> familyPaths) throws
IOException {
+    RegionCoprocessorEnvironment e = ctx.getEnvironment();
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    if (Arrays.equals(e.getRegion().getTableDesc().getName(),
+        TestRegionObserverInterface.TEST_TABLE)) {
+      assertNotNull(familyPaths);
+      assertEquals(1,familyPaths.size());
+      assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
+      String familyPath = familyPaths.get(0).getSecond();
+      String familyName = Bytes.toString(TestRegionObserverInterface.A);
+      assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
+    }
+    hadPreBulkLoadHFile = true;
+  }
+
+  @Override
+  public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                                   List<Pair<byte[], String>> familyPaths, boolean
hasLoaded) throws IOException {
+    RegionCoprocessorEnvironment e = ctx.getEnvironment();
+    assertNotNull(e);
+    assertNotNull(e.getRegion());
+    if (Arrays.equals(e.getRegion().getTableDesc().getName(),
+        TestRegionObserverInterface.TEST_TABLE)) {
+      assertNotNull(familyPaths);
+      assertEquals(1,familyPaths.size());
+      assertArrayEquals(familyPaths.get(0).getFirst(), TestRegionObserverInterface.A);
+      String familyPath = familyPaths.get(0).getSecond();
+      String familyName = Bytes.toString(TestRegionObserverInterface.A);
+      assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
+    }
+    hadPostBulkLoadHFile = true;
+    return hasLoaded;
+  }
+
   public boolean hadPreGet() {
     return hadPreGet;
   }
@@ -430,4 +472,12 @@ public class SimpleRegionObserver extend
   public boolean hadDeleted() {
     return hadPreDeleted && hadPostDeleted;
   }
+
+  public boolean hadPostBulkLoadHFile() {
+    return hadPostBulkLoadHFile;
+  }
+
+  public boolean hadPreBulkLoadHFile() {
+    return hadPreBulkLoadHFile;
+  }
 }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1353060&r1=1353059&r2=1353060&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
Sat Jun 23 01:54:12 2012
@@ -29,8 +29,13 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@@ -416,6 +421,37 @@ public class TestRegionObserverInterface
     table.close();
   }
 
+  @Test
+  public void bulkLoadHFileTest() throws Exception {
+    String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
+    byte[] tableName = TEST_TABLE;
+    Configuration conf = util.getConfiguration();
+    HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+        tableName,
+        new Boolean[] {false, false}
+    );
+
+    FileSystem fs = util.getTestFileSystem();
+    final Path dir = util.getDataTestDir(testName).makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(A));
+
+    createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
+
+    //Bulk load
+    new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+        tableName,
+        new Boolean[] {true, true}
+    );
+    util.deleteTable(tableName);
+    table.close();
+  }
+
   // check each region whether the coprocessor upcalls are called or not.
   private void verifyMethodResult(Class c, String methodName[], byte[] tableName,
                                   Object value[]) throws IOException {
@@ -444,6 +480,25 @@ public class TestRegionObserverInterface
     }
   }
 
+  private static void createHFile(
+      Configuration conf,
+      FileSystem fs, Path path,
+      byte[] family, byte[] qualifier) throws IOException {
+    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
+        .withPath(fs, path)
+        .withComparator(KeyValue.KEY_COMPARATOR)
+        .create();
+    long now = System.currentTimeMillis();
+    try {
+      for (int i =1;i<=9;i++) {
+        KeyValue kv = new KeyValue(Bytes.toBytes(i+""), family, qualifier, now, Bytes.toBytes(i+""));
+        writer.append(kv);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
   private static byte [][] makeN(byte [] base, int n) {
     byte [][] ret = new byte[n][];
     for(int i=0;i<n;i++) {



Mime
View raw message