hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-17292 Add observer notification before bulk loaded hfile is moved to region directory
Date Thu, 15 Dec 2016 17:47:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/14123 7c1eb6536 -> e1a992d6b


HBASE-17292 Add observer notification before bulk loaded hfile is moved to region directory


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1a992d6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1a992d6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1a992d6

Branch: refs/heads/14123
Commit: e1a992d6b03d25cf207c8378de63f73a889e5ac5
Parents: 7c1eb65
Author: tedyu <yuzhihong@gmail.com>
Authored: Thu Dec 15 09:47:43 2016 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Thu Dec 15 09:47:43 2016 -0800

----------------------------------------------------------------------
 .../hbase/coprocessor/BaseRegionObserver.java   | 10 +++
 .../hbase/coprocessor/RegionObserver.java       | 25 ++++++
 .../hadoop/hbase/regionserver/HRegion.java      | 84 ++++++++++++++------
 .../hbase/regionserver/HRegionFileSystem.java   | 24 ++++--
 .../hadoop/hbase/regionserver/HStore.java       | 15 +++-
 .../hbase/regionserver/RSRpcServices.java       | 19 +++--
 .../regionserver/RegionCoprocessorHost.java     | 20 +++++
 .../regionserver/SecureBulkLoadManager.java     | 81 ++++++++++---------
 8 files changed, 200 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 3442b64..64b185d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -511,6 +511,16 @@ public class BaseRegionObserver implements RegionObserver {
   }
 
   @Override
+  public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException
{
+  }
+
+  @Override
+  public void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final byte[] family, Path srcPath, Path dstPath) throws IOException {
+  }
+
+  @Override
   public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
     List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>>
finalPaths,
     boolean hasLoaded) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index c2cc0e8..1ebb494 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1383,6 +1383,31 @@ public interface RegionObserver extends Coprocessor {
     List<Pair<byte[], String>> familyPaths) throws IOException;
 
   /**
+   * Called before moving bulk loaded hfile to region directory.
+   *
+   * @param ctx
+   * @param family column family
+   * @param pairs List of pairs of { HFile location in staging dir, HFile path in region
dir }
+   * Each pair are for the same hfile.
+   * @throws IOException
+   */
+  default void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException
{
+  }
+
+  /**
+   * Called after moving bulk loaded hfile to region directory.
+   *
+   * @param ctx
+   * @param family column family
+   * @param srcPath Path to file before the move
+   * @param dstPath Path to file after the move
+   */
+  default void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final byte[] family, Path srcPath, Path dstPath) throws IOException {
+  }
+
+  /**
    * Called after bulkLoadHFile.
    *
    * @param ctx

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 938fbf5..6160b5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5554,42 +5554,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         }
       }
 
+      Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath =
+          new TreeMap<>(Bytes.BYTES_COMPARATOR);
       for (Pair<byte[], String> p : familyPaths) {
         byte[] familyName = p.getFirst();
         String path = p.getSecond();
         HStore store = getHStore(familyName);
+        if (!familyWithFinalPath.containsKey(familyName)) {
+          familyWithFinalPath.put(familyName, new ArrayList<>());
+        }
+        List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
         try {
           String finalPath = path;
           if (bulkLoadListener != null) {
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
           }
-          Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
-
-          // Note the size of the store file
-          try {
-            FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
-            storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
-                .getLen());
-          } catch (IOException e) {
-            LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
-            storeFilesSizes.put(commitedStoreFile.getName(), 0L);
-          }
-
-          if(storeFiles.containsKey(familyName)) {
-            storeFiles.get(familyName).add(commitedStoreFile);
-          } else {
-            List<Path> storeFileNames = new ArrayList<Path>();
-            storeFileNames.add(commitedStoreFile);
-            storeFiles.put(familyName, storeFileNames);
-          }
-          if (bulkLoadListener != null) {
-            bulkLoadListener.doneBulkLoad(familyName, path);
-          }
+          Path commitedStoreFile = store.preBulkLoadHFile(finalPath, seqId);
+          lst.add(new Pair<Path, Path>(new Path(finalPath), commitedStoreFile));
         } catch (IOException ioe) {
           // A failure here can cause an atomicity violation that we currently
           // cannot recover from since it is likely a failed HDFS operation.
 
-          // TODO Need a better story for reverting partial failures due to HDFS.
           LOG.error("There was a partial failure due to IO when attempting to" +
               " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
           if (bulkLoadListener != null) {
@@ -5604,6 +5589,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         }
       }
 
+      if (this.getCoprocessorHost() != null) {
+        for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet())
{
+          this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue());
+        }
+      }
+      for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet())
{
+        byte[] familyName = entry.getKey();
+        for (Pair<Path, Path> p : entry.getValue()) {
+          String path = p.getFirst().toString();
+          Path commitedStoreFile = p.getSecond();
+          HStore store = getHStore(familyName);
+          try {
+            store.bulkLoadHFile(familyName, path, commitedStoreFile);
+            // Note the size of the store file
+            try {
+              FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
+              storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
+                  .getLen());
+            } catch (IOException e) {
+              LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
+              storeFilesSizes.put(commitedStoreFile.getName(), 0L);
+            }
+
+            if(storeFiles.containsKey(familyName)) {
+              storeFiles.get(familyName).add(commitedStoreFile);
+            } else {
+              List<Path> storeFileNames = new ArrayList<Path>();
+              storeFileNames.add(commitedStoreFile);
+              storeFiles.put(familyName, storeFileNames);
+            }
+            if (bulkLoadListener != null) {
+              bulkLoadListener.doneBulkLoad(familyName, path);
+            }
+          } catch (IOException ioe) {
+            // A failure here can cause an atomicity violation that we currently
+            // cannot recover from since it is likely a failed HDFS operation.
+
+            // TODO Need a better story for reverting partial failures due to HDFS.
+            LOG.error("There was a partial failure due to IO when attempting to" +
+                " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe);
+            if (bulkLoadListener != null) {
+              try {
+                bulkLoadListener.failedBulkLoad(familyName, path);
+              } catch (Exception ex) {
+                LOG.error("Error while calling failedBulkLoad for family " +
+                    Bytes.toString(familyName) + " with path " + path, ex);
+              }
+            }
+            throw ioe;
+          }
+        }
+      }
+
       isSuccessful = true;
     } finally {
       if (wal != null && !storeFiles.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 50382a4..51b6543 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -365,19 +365,21 @@ public class HRegionFileSystem {
    * @throws IOException
    */
   public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException
{
-    return commitStoreFile(familyName, buildPath, -1, false);
+    Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
+    return commitStoreFile(buildPath, dstPath);
   }
 
   /**
-   * Move the file from a build/temp location to the main family store directory.
+   * Generate the filename in the main family store directory for moving the file from a
build/temp
+   *  location.
    * @param familyName Family that will gain the file
    * @param buildPath {@link Path} to the file to commit.
    * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence
number)
    * @param generateNewName False if you want to keep the buildPath name
-   * @return The new {@link Path} of the committed file
+   * @return The new {@link Path} of the to be committed file
    * @throws IOException
    */
-  private Path commitStoreFile(final String familyName, final Path buildPath,
+  private Path preCommitStoreFile(final String familyName, final Path buildPath,
       final long seqNum, final boolean generateNewName) throws IOException {
     Path storeDir = getStoreDir(familyName);
     if(!fs.exists(storeDir) && !createDir(storeDir))
@@ -394,6 +396,17 @@ public class HRegionFileSystem {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Committing store file " + buildPath + " as " + dstPath);
     }
+    return dstPath;
+  }
+
+  /*
+   * Moves file from staging dir to region dir
+   * @param buildPath {@link Path} to the file to commit.
+   * @param dstPath {@link Path} to the file under region dir
+   * @return The {@link Path} of the committed file
+   * @throws IOException
+   */
+  Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
     // buildPath exists, therefore not doing an exists() check.
     if (!rename(buildPath, dstPath)) {
       throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
@@ -401,7 +414,6 @@ public class HRegionFileSystem {
     return dstPath;
   }
 
-
   /**
    * Moves multiple store files to the relative region's family store directory.
    * @param storeFiles list of store files divided by family
@@ -469,7 +481,7 @@ public class HRegionFileSystem {
       srcPath = tmpPath;
     }
 
-    return commitStoreFile(familyName, srcPath, seqNum, true);
+    return preCommitStoreFile(familyName, srcPath, seqNum, true);
   }
 
   // ===========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 70928a2..06fbf75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -770,9 +770,20 @@ public class HStore implements Store {
    * @param srcPathStr
    * @param seqNum sequence Id associated with the HFile
    */
-  public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+  public Path preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
     Path srcPath = new Path(srcPathStr);
-    Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+    return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+  }
+
+  public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException
{
+    Path srcPath = new Path(srcPathStr);
+    try {
+      fs.commitStoreFile(srcPath, dstPath);
+    } finally {
+      if (this.getCoprocessorHost() != null) {
+        this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
+      }
+    }
 
     LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as
"
         + dstPath + " - updating store file list.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9edbba6..d288911 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2124,15 +2124,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         if (region.getCoprocessorHost() != null) {
           bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
         }
-        if (!bypass) {
-          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
-              request.getCopyFile());
-          if (map != null) {
-            loaded = true;
+        try { 
+          if (!bypass) {
+            map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
+                request.getCopyFile());
+            if (map != null) {
+              loaded = true;
+            }
+          }
+        } finally {
+          if (region.getCoprocessorHost() != null) {
+            loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
           }
-        }
-        if (region.getCoprocessorHost() != null) {
-          loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
         }
       } else {
         // secure bulk load

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index f6388fa..20bf5c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1548,6 +1548,26 @@ public class RegionCoprocessorHost
     });
   }
 
+  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>>
pairs)
+      throws IOException {
+    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+      @Override
+      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment>
ctx)
+          throws IOException {
+        oserver.preCommitStoreFile(ctx, family, pairs);
+      }
+    });
+  }
+  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws
IOException {
+    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+      @Override
+      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment>
ctx)
+          throws IOException {
+        oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
+      }
+    });
+  }
+
   /**
    * @param familyPaths pairs of { CF, file path } submitted for bulk load
    * @param map Map of CF to List of file paths for the final loaded files

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1a992d6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 8b44f22..7a18cbb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -202,52 +202,55 @@ public class SecureBulkLoadManager {
     boolean loaded = false;
     Map<byte[], List<Path>> map = null;
 
-    if (!bypass) {
-      // Get the target fs (HBase region server fs) delegation token
-      // Since we have checked the permission via 'preBulkLoadHFile', now let's give
-      // the 'request user' necessary token to operate on the target fs.
-      // After this point the 'doAs' user will hold two tokens, one for the source fs
-      // ('request user'), another for the target fs (HBase region server principal).
-      if (userProvider.isHadoopSecurityEnabled()) {
-        FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
-        targetfsDelegationToken.acquireDelegationToken(fs);
-
-        Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
-        if (targetFsToken != null
-            && (userToken == null || !targetFsToken.getService().equals(userToken.getService())))
{
-          ugi.addToken(targetFsToken);
+    try {
+      if (!bypass) {
+        // Get the target fs (HBase region server fs) delegation token
+        // Since we have checked the permission via 'preBulkLoadHFile', now let's give
+        // the 'request user' necessary token to operate on the target fs.
+        // After this point the 'doAs' user will hold two tokens, one for the source fs
+        // ('request user'), another for the target fs (HBase region server principal).
+        if (userProvider.isHadoopSecurityEnabled()) {
+          FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer");
+          targetfsDelegationToken.acquireDelegationToken(fs);
+
+          Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
+          if (targetFsToken != null
+              && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){
+            ugi.addToken(targetFsToken);
+          }
         }
-      }
 
-      map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
-        @Override
-        public Map<byte[], List<Path>> run() {
-          FileSystem fs = null;
-          try {
-            fs = FileSystem.get(conf);
-            for(Pair<byte[], String> el: familyPaths) {
-              Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
-              if(!fs.exists(stageFamily)) {
-                fs.mkdirs(stageFamily);
-                fs.setPermission(stageFamily, PERM_ALL_ACCESS);
+        map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>()
{
+          @Override
+          public Map<byte[], List<Path>> run() {
+            FileSystem fs = null;
+            try {
+              fs = FileSystem.get(conf);
+              for(Pair<byte[], String> el: familyPaths) {
+                Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
+                if(!fs.exists(stageFamily)) {
+                  fs.mkdirs(stageFamily);
+                  fs.setPermission(stageFamily, PERM_ALL_ACCESS);
+                }
               }
+              //We call bulkLoadHFiles as requesting user
+              //To enable access prior to staging
+              return region.bulkLoadHFiles(familyPaths, true,
+                  new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
+            } catch (Exception e) {
+              LOG.error("Failed to complete bulk load", e);
             }
-            //We call bulkLoadHFiles as requesting user
-            //To enable access prior to staging
-            return region.bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
-          } catch (Exception e) {
-            LOG.error("Failed to complete bulk load", e);
+            return null;
           }
-          return null;
+        });
+        if (map != null) {
+          loaded = true;
         }
-      });
-      if (map != null) {
-        loaded = true;
       }
-    }
-    if (region.getCoprocessorHost() != null) {
-       region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
+    } finally {
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
+      }
     }
     return map;
   }


Mime
View raw message