hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject hbase git commit: HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path, HTable) doesn't handle unmanaged connections when using SecureBulkLoad
Date Mon, 15 Jun 2015 19:26:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.0 573e4b4f6 -> a459dd667


HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections
when using SecureBulkLoad


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a459dd66
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a459dd66
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a459dd66

Branch: refs/heads/branch-1.0
Commit: a459dd667ce16b3bf471ade9778eb27a41a1ddda
Parents: 573e4b4
Author: Nick Dimiduk <ndimiduk@apache.org>
Authored: Sun Jun 14 15:45:10 2015 -0700
Committer: Nick Dimiduk <ndimiduk@apache.org>
Committed: Mon Jun 15 12:22:58 2015 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 81 ++++++++++++--------
 .../mapreduce/TestLoadIncrementalHFiles.java    | 79 +++++++++++--------
 2 files changed, 96 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a459dd66/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 05ac012..8d4a002 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -65,11 +65,13 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.Table;
@@ -288,11 +290,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
   public void doBulkLoad(Path hfofDir, final HTable table)
     throws TableNotFoundException, IOException
   {
-    final HConnection conn = table.getConnection();
+    boolean closeConnWhenFinished = false;
+    HConnection conn = table.getConnection();
+    Table t = table;
+
+    if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged())
{
+      LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
+      // can only use unmanaged connections from here on out.
+      conn = (HConnection) ConnectionFactory.createConnection(table.getConfiguration());
+      t = conn.getTable(table.getName());
+      closeConnWhenFinished = true;
+      if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged())
{
+        throw new RuntimeException("Failed to create unmanaged connection.");
+      }
+    }
 
-    if (!conn.isTableAvailable(table.getName())) {
+    if (!conn.isTableAvailable(t.getName())) {
       throw new TableNotFoundException("Table " +
-          Bytes.toStringBinary(table.getTableName()) +
+          Bytes.toStringBinary(t.getName().getName()) +
           "is not currently available.");
     }
 
@@ -313,7 +328,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
     try {
       discoverLoadQueue(queue, hfofDir);
       // check whether there is invalid family name in HFiles to be bulkloaded
-      Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+      Collection<HColumnDescriptor> families = t.getTableDescriptor().getFamilies();
       ArrayList<String> familyNames = new ArrayList<String>(families.size());
       for (HColumnDescriptor family : families) {
         familyNames.add(family.getNameAsString());
@@ -331,7 +346,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
         String msg =
             "Unmatched family names found: unmatched family names in HFiles to be bulkloaded:
"
                 + unmatchedFamilies + "; valid family names of table "
-                + Bytes.toString(table.getTableName()) + " are: " + familyNames;
+                + Bytes.toString(t.getName().getName()) + " are: " + familyNames;
         LOG.error(msg);
         throw new IOException(msg);
       }
@@ -349,46 +364,48 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
       // fs is the source filesystem
       fsDelegationToken.acquireDelegationToken(fs);
       if(isSecureBulkLoadEndpointAvailable()) {
-        bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
+        bulkToken = new SecureBulkLoadClient(t).prepareBulkLoad(t.getName());
       }
 
       // Assumes that region splits can happen while this occurs.
       while (!queue.isEmpty()) {
         // need to reload split keys each iteration.
-        final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
-        if (count != 0) {
-          LOG.info("Split occured while grouping HFiles, retry attempt " +
-              + count + " with " + queue.size() + " files remaining to group or split");
-        }
+        try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
+          final Pair<byte[][], byte[][]> startEndKeys = rl.getStartEndKeys();
+          if (count != 0) {
+            LOG.info("Split occured while grouping HFiles, retry attempt " +
+                +count + " with " + queue.size() + " files remaining to group or split");
+          }
 
-        int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
-        if (maxRetries != 0 && count >= maxRetries) {
-          throw new IOException("Retry attempted " + count +
-            " times without completing, bailing out");
-        }
-        count++;
+          int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
+          if (maxRetries != 0 && count >= maxRetries) {
+            throw new IOException("Retry attempted " + count +
+                " times without completing, bailing out");
+          }
+          count++;
 
-        // Using ByteBuffer for byte[] equality semantics
-        Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
-            pool, queue, startEndKeys);
+          // Using ByteBuffer for byte[] equality semantics
+          Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase((HTable)
t,
+              pool, queue, startEndKeys);
 
-        if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
-          // Error is logged inside checkHFilesCountPerRegionPerFamily.
-          throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
-            + " hfiles to one family of one region");
-        }
+          if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+            // Error is logged inside checkHFilesCountPerRegionPerFamily.
+            throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
+                + " hfiles to one family of one region");
+          }
 
-        bulkLoadPhase(table, conn, pool, queue, regionGroups);
+          bulkLoadPhase(t, conn, pool, queue, regionGroups);
 
-        // NOTE: The next iteration's split / group could happen in parallel to
-        // atomic bulkloads assuming that there are splits and no merges, and
-        // that we can atomically pull out the groups we want to retry.
+          // NOTE: The next iteration's split / group could happen in parallel to
+          // atomic bulkloads assuming that there are splits and no merges, and
+          // that we can atomically pull out the groups we want to retry.
+        }
       }
 
     } finally {
       fsDelegationToken.releaseDelegationToken();
       if(bulkToken != null) {
-        new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+        new SecureBulkLoadClient(t).cleanupBulkLoad(bulkToken);
       }
       pool.shutdown();
       if (queue != null && !queue.isEmpty()) {
@@ -401,6 +418,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
         }
         LOG.error(err);
       }
+      if (closeConnWhenFinished) {
+        t.close();
+        conn.close();
+      }
     }
 
     if (queue != null && !queue.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a459dd66/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index d5a9a86..51ba98a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -232,47 +234,56 @@ public class TestLoadIncrementalHFiles {
 
   private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception
{
-    Path dir = util.getDataTestDirOnTestFS(testName);
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
 
-    int hfileIdx = 0;
-    for (byte[][] range : hfileRanges) {
-      byte[] from = range[0];
-      byte[] to = range[1];
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
-          + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
-    }
-    int expectedRows = hfileIdx * 1000;
+    for (boolean managed : new boolean[] { true, false }) {
+      Path dir = util.getDataTestDirOnTestFS(testName);
+      FileSystem fs = util.getTestFileSystem();
+      dir = dir.makeQualified(fs);
+      Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+      int hfileIdx = 0;
+      for (byte[][] range : hfileRanges) {
+        byte[] from = range[0];
+        byte[] to = range[1];
+        HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+            + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+      }
+      int expectedRows = hfileIdx * 1000;
 
-    if (preCreateTable) {
-      util.getHBaseAdmin().createTable(htd, tableSplitKeys);
-    }
+      if (preCreateTable) {
+        util.getHBaseAdmin().createTable(htd, tableSplitKeys);
+      }
 
-    final TableName tableName = htd.getTableName();
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
-    String [] args= {dir.toString(), tableName.toString()};
-    loader.run(args);
+      final TableName tableName = htd.getTableName();
+      if (!util.getHBaseAdmin().tableExists(tableName)) {
+        util.getHBaseAdmin().createTable(htd);
+      }
+      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
 
-    Table table = new HTable(util.getConfiguration(), tableName);
-    try {
-      assertEquals(expectedRows, util.countRows(table));
-    } finally {
-      table.close();
-    }
+      if (managed) {
+        try (HTable table = new HTable(util.getConfiguration(), tableName)) {
+          loader.doBulkLoad(dir, table);
+          assertEquals(expectedRows, util.countRows(table));
+        }
+      } else {
+        try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
+            HTable table = (HTable) conn.getTable(tableName)) {
+          loader.doBulkLoad(dir, table);
+        }
+      }
 
-    // verify staging folder has been cleaned up
-    Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
-    if(fs.exists(stagingBasePath)) {
-      FileStatus[] files = fs.listStatus(stagingBasePath);
-      for(FileStatus file : files) {
-        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
-          file.getPath().getName() != "DONOTERASE");
+      // verify staging folder has been cleaned up
+      Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
+      if (fs.exists(stagingBasePath)) {
+        FileStatus[] files = fs.listStatus(stagingBasePath);
+        for (FileStatus file : files) {
+          assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
+              file.getPath().getName() != "DONOTERASE");
+        }
       }
-    }
 
-    util.deleteTable(tableName);
+      util.deleteTable(tableName);
+    }
   }
 
   /**


Mime
View raw message