hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [1/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated
Date Sun, 03 Sep 2017 12:05:01 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 49986e9df -> a37417c25


http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0fe79d1..199c2c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -60,13 +60,13 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
new file mode 100644
index 0000000..3ebda29
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.security.UserProvider;
+
+/**
+ * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying
+ * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used
+ * to do the authentication, which requires a Kerberos ticket (which we currently don't have in
+ * tests).
+ * <p>
+ * This should only be used for <b>TESTING</b>.
+ */
+public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider {
+
+  @Override
+  public boolean isHBaseSecurityEnabled() {
+    return false;
+  }
+
+  @Override
+  public boolean isHadoopSecurityEnabled() {
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 6583366..1e38179 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -25,6 +25,12 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -93,7 +99,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.locking.LockProcedure;
@@ -118,11 +123,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
-import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.log4j.Level;
@@ -134,11 +137,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 
 /**
  * Performs authorization checks for common operations, according to different

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
new file mode 100644
index 0000000..3f7d441
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This class provides shims for HBase to interact with the Hadoop 1.0.x and the
+ * Hadoop 0.23.x series.
+ *
+ * NOTE: No testing done against 0.22.x, or 0.21.x.
+ */
+abstract public class MapreduceTestingShim {
+  private static MapreduceTestingShim instance;
+  private static Class[] emptyParam = new Class[] {};
+
+  static {
+    try {
+      // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
+      Class c = Class
+          .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+      instance = new MapreduceV2Shim();
+    } catch (Exception e) {
+      instance = new MapreduceV1Shim();
+    }
+  }
+
+  abstract public JobContext newJobContext(Configuration jobConf)
+      throws IOException;
+
+  abstract public Job newJob(Configuration conf) throws IOException;
+
+  abstract public JobConf obtainJobConf(MiniMRCluster cluster);
+
+  abstract public String obtainMROutputDirProp();
+
+  public static JobContext createJobContext(Configuration jobConf)
+      throws IOException {
+    return instance.newJobContext(jobConf);
+  }
+
+  public static JobConf getJobConf(MiniMRCluster cluster) {
+    return instance.obtainJobConf(cluster);
+  }
+
+  public static Job createJob(Configuration conf) throws IOException {
+    return instance.newJob(conf);
+  }
+
+  public static String getMROutputDirProp() {
+    return instance.obtainMROutputDirProp();
+  }
+
+  private static class MapreduceV1Shim extends MapreduceTestingShim {
+    public JobContext newJobContext(Configuration jobConf) throws IOException {
+      // Implementing:
+      // return new JobContext(jobConf, new JobID());
+      JobID jobId = new JobID();
+      Constructor<JobContext> c;
+      try {
+        c = JobContext.class.getConstructor(Configuration.class, JobID.class);
+        return c.newInstance(jobConf, jobId);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            "Failed to instantiate new JobContext(jobConf, new JobID())", e);
+      }
+    }
+
+    @Override
+    public Job newJob(Configuration conf) throws IOException {
+      // Implementing:
+      // return new Job(conf);
+      Constructor<Job> c;
+      try {
+        c = Job.class.getConstructor(Configuration.class);
+        return c.newInstance(conf);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            "Failed to instantiate new Job(conf)", e);
+      }
+    }
+
+    public JobConf obtainJobConf(MiniMRCluster cluster) {
+      if (cluster == null) return null;
+      try {
+        Object runner = cluster.getJobTrackerRunner();
+        Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
+        Object tracker = meth.invoke(runner, new Object []{});
+        Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
+        return (JobConf) m.invoke(tracker, new Object []{});
+      } catch (NoSuchMethodException nsme) {
+        return null;
+      } catch (InvocationTargetException ite) {
+        return null;
+      } catch (IllegalAccessException iae) {
+        return null;
+      }
+    }
+
+    @Override
+    public String obtainMROutputDirProp() {
+      return "mapred.output.dir";
+    }
+  };
+
+  private static class MapreduceV2Shim extends MapreduceTestingShim {
+    public JobContext newJobContext(Configuration jobConf) {
+      return newJob(jobConf);
+    }
+
+    @Override
+    public Job newJob(Configuration jobConf) {
+      // Implementing:
+      // return Job.getInstance(jobConf);
+      try {
+        Method m = Job.class.getMethod("getInstance", Configuration.class);
+        return (Job) m.invoke(null, jobConf); // static method, then arg
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new IllegalStateException(
+            "Failed to return from Job.getInstance(jobConf)");
+      }
+    }
+
+    public JobConf obtainJobConf(MiniMRCluster cluster) {
+      try {
+        Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
+        return (JobConf) meth.invoke(cluster, new Object []{});
+      } catch (NoSuchMethodException nsme) {
+        return null;
+      } catch (InvocationTargetException ite) {
+        return null;
+      } catch (IllegalAccessException iae) {
+        return null;
+      }
+    }
+
+    @Override
+    public String obtainMROutputDirProp() {
+      // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
+      // from Hadoop 0.23.x.  If we use the source directly we break the hadoop 1.x compile.
+      return "mapreduce.output.fileoutputformat.outputdir";
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
new file mode 100644
index 0000000..7e4d40e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
@@ -0,0 +1,723 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
+ * faster than the full MR cluster tests in TestHFileOutputFormat
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestLoadIncrementalHFiles {
+  @Rule
+  public TestName tn = new TestName();
+
+  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
+  private static final byte[] FAMILY = Bytes.toBytes("myfam");
+  private static final String NAMESPACE = "bulkNS";
+
+  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
+  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
+
+  private static final byte[][] SPLIT_KEYS =
+      new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
+
+  static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+      MAX_FILES_PER_REGION_PER_FAMILY);
+    // change default behavior so that tag values are returned with normal rpcs
+    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+      KeyValueCodecWithTags.class.getCanonicalName());
+    util.startMiniCluster();
+
+    setupNamespace();
+  }
+
+  protected static void setupNamespace() throws Exception {
+    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithMap() throws Exception {
+    runTest("testSimpleLoadWithMap", BloomType.NONE,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+      true);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
+   */
+  @Test(timeout = 120000)
+  public void testSimpleLoad() throws Exception {
+    runTest("testSimpleLoad", BloomType.NONE,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
+  }
+
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithFileCopy() throws Exception {
+    String testName = tn.getMethodName();
+    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
+      false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+      false, true);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingLoad() throws Exception {
+    runTest("testRegionCrossingLoad", BloomType.NONE,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test loading into a column family that has a ROW bloom filter.
+   */
+  @Test(timeout = 60000)
+  public void testRegionCrossingRowBloom() throws Exception {
+    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test loading into a column family that has a ROWCOL bloom filter.
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingRowColBloom() throws Exception {
+    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that have different region boundaries than
+   * the table pre-split.
+   */
+  @Test(timeout = 120000)
+  public void testSimpleHFileSplit() throws Exception {
+    runTest("testHFileSplit", BloomType.NONE,
+      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
+          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
+          new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries and have
+   * different region boundaries than the table pre-split.
+   */
+  @Test(timeout = 60000)
+  public void testRegionCrossingHFileSplit() throws Exception {
+    testRegionCrossingHFileSplit(BloomType.NONE);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
+   * filter and a different region boundaries than the table pre-split.
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
+    testRegionCrossingHFileSplit(BloomType.ROW);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
+   * bloom filter and a different region boundaries than the table pre-split.
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
+    testRegionCrossingHFileSplit(BloomType.ROWCOL);
+  }
+
+  @Test
+  public void testSplitALot() throws Exception {
+    runTest("testSplitALot", BloomType.NONE,
+      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
+          Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
+          Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
+          Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
+          Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
+          Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
+  }
+
+  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
+    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
+      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
+          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
+    return TableDescriptorBuilder.newBuilder(tableName)
+        .addColumnFamily(
+          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
+        .build();
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
+      throws Exception {
+    runTest(testName, bloomType, null, hfileRanges);
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
+      throws Exception {
+    runTest(testName, bloomType, null, hfileRanges, useMap);
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges) throws Exception {
+    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges, boolean useMap) throws Exception {
+    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+    final boolean preCreateTable = tableSplitKeys != null;
+
+    // Run the test bulkloading the table to the default namespace
+    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
+    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
+      useMap);
+
+    // Run the test bulkloading the table to the specified namespace
+    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
+    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
+      useMap);
+  }
+
+  private void runTest(String testName, TableName tableName, BloomType bloomType,
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
+      throws Exception {
+    TableDescriptor htd = buildHTD(tableName, bloomType);
+    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
+  }
+
+  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
+      byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
+      int initRowCount, int factor) throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    Map<byte[], List<Path>> map = null;
+    List<Path> list = null;
+    if (useMap || copyFiles) {
+      list = new ArrayList<>();
+    }
+    if (useMap) {
+      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      map.put(fam, list);
+    }
+    Path last = null;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
+      if (useMap) {
+        last = path;
+        list.add(path);
+      }
+    }
+    int expectedRows = hfileIdx * factor;
+
+    TableName tableName = htd.getTableName();
+    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
+      util.getAdmin().createTable(htd, tableSplitKeys);
+    }
+
+    Configuration conf = util.getConfiguration();
+    if (copyFiles) {
+      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+    }
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    String[] args = { dir.toString(), tableName.toString() };
+    if (useMap) {
+      if (deleteFile) {
+        fs.delete(last, true);
+      }
+      Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
+      if (deleteFile) {
+        expectedRows -= 1000;
+        for (LoadQueueItem item : loaded.keySet()) {
+          if (item.getFilePath().getName().equals(last.getName())) {
+            fail(last + " should be missing");
+          }
+        }
+      }
+    } else {
+      loader.run(args);
+    }
+
+    if (copyFiles) {
+      for (Path p : list) {
+        assertTrue(p + " should exist", fs.exists(p));
+      }
+    }
+
+    Table table = util.getConnection().getTable(tableName);
+    try {
+      assertEquals(initRowCount + expectedRows, util.countRows(table));
+    } finally {
+      table.close();
+    }
+
+    return expectedRows;
+  }
+
+  private void runTest(String testName, TableDescriptor htd, BloomType bloomType,
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+      boolean copyFiles) throws Exception {
+    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
+      useMap, true, copyFiles, 0, 1000);
+
+    final TableName tableName = htd.getTableName();
+    // verify staging folder has been cleaned up
+    Path stagingBasePath =
+        new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
+    FileSystem fs = util.getTestFileSystem();
+    if (fs.exists(stagingBasePath)) {
+      FileStatus[] files = fs.listStatus(stagingBasePath);
+      for (FileStatus file : files) {
+        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
+          file.getPath().getName() != "DONOTERASE");
+      }
+    }
+
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
+   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
+   * responses.
+   */
+  @Test(timeout = 60000)
+  public void testTagsSurviveBulkLoadSplit() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+    // table has these split points
+    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
+        Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
+
+    // creating an hfile that has values that span the split points.
+    byte[] from = Bytes.toBytes("ddd");
+    byte[] to = Bytes.toBytes("ooo");
+    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
+      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
+    int expectedRows = 1000;
+
+    TableName tableName = TableName.valueOf(tn.getMethodName());
+    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
+    util.getAdmin().createTable(htd, tableSplitKeys);
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String[] args = { dir.toString(), tableName.toString() };
+    loader.run(args);
+
+    Table table = util.getConnection().getTable(tableName);
+    try {
+      assertEquals(expectedRows, util.countRows(table));
+      HFileTestUtil.verifyTags(table);
+    } finally {
+      table.close();
+    }
+
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Test loading into a column family that does not exist.
+   */
+  @Test(timeout = 60000)
+  public void testNonexistentColumnFamilyLoad() throws Exception {
+    String testName = tn.getMethodName();
+    byte[][][] hFileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
+            new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
+
+    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
+    // set real family name to upper case in purpose to simulate the case that
+    // family name in HFiles is invalid
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
+        .addColumnFamily(ColumnFamilyDescriptorBuilder
+            .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
+        .build();
+
+    try {
+      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
+      assertTrue("Loading into table with non-existent family should have failed", false);
+    } catch (Exception e) {
+      assertTrue("IOException expected", e instanceof IOException);
+      // further check whether the exception message is correct
+      String errMsg = e.getMessage();
+      assertTrue(
+        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY +
+            "], current message: [" + errMsg + "]",
+        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
+    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
+  }
+
+  @Test(timeout = 120000)
+  public void testNonHfileFolder() throws Exception {
+    testNonHfileFolder("testNonHfileFolder", false);
+  }
+
+  /**
+   * Write a random data file and a non-file in a dir with a valid family name but not part of the
+   * table families. we should we able to bulkload without getting the unmatched family exception.
+   * HBASE-13037/HBASE-13227
+   */
+  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(tableName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
+      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
+    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
+
+    final String NON_FAMILY_FOLDER = "_logs";
+    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
+    fs.mkdirs(nonFamilyDir);
+    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
+    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
+
+    Table table = null;
+    try {
+      if (preCreateTable) {
+        table = util.createTable(TableName.valueOf(tableName), FAMILY);
+      } else {
+        table = util.getConnection().getTable(TableName.valueOf(tableName));
+      }
+
+      final String[] args = { dir.toString(), tableName };
+      new LoadIncrementalHFiles(util.getConfiguration()).run(args);
+      assertEquals(500, util.countRows(table));
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+      fs.delete(dir, true);
+    }
+  }
+
+  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
+    FSDataOutputStream stream = fs.create(path);
+    try {
+      byte[] data = new byte[1024];
+      for (int i = 0; i < data.length; ++i) {
+        data[i] = (byte) (i & 0xff);
+      }
+      while (size >= data.length) {
+        stream.write(data, 0, data.length);
+        size -= data.length;
+      }
+      if (size > 0) {
+        stream.write(data, 0, size);
+      }
+    } finally {
+      stream.close();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testSplitStoreFile() throws IOException {
+    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
+    FileSystem fs = util.getTestFileSystem();
+    Path testIn = new Path(dir, "testhfile");
+    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
+    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
+      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+    Path bottomOut = new Path(dir, "bottom.out");
+    Path topOut = new Path(dir, "top.out");
+
+    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+      Bytes.toBytes("ggg"), bottomOut, topOut);
+
+    int rowCount = verifyHFile(bottomOut);
+    rowCount += verifyHFile(topOut);
+    assertEquals(1000, rowCount);
+  }
+
+  @Test
+  public void testSplitStoreFileWithNoneToNone() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
+  }
+
+  @Test
+  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
+  }
+
+  @Test
+  public void testSplitStoreFileWithEncodedToNone() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
+  }
+
+  @Test
+  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
+  }
+
+  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
+      DataBlockEncoding cfEncoding) throws IOException {
+    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
+    FileSystem fs = util.getTestFileSystem();
+    Path testIn = new Path(dir, "testhfile");
+    ColumnFamilyDescriptor familyDesc =
+        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
+    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
+      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+    Path bottomOut = new Path(dir, "bottom.out");
+    Path topOut = new Path(dir, "top.out");
+
+    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+      Bytes.toBytes("ggg"), bottomOut, topOut);
+
+    int rowCount = verifyHFile(bottomOut);
+    rowCount += verifyHFile(topOut);
+    assertEquals(1000, rowCount);
+  }
+
+  private int verifyHFile(Path p) throws IOException {
+    Configuration conf = util.getConfiguration();
+    HFile.Reader reader =
+        HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, false);
+    scanner.seekTo();
+    int count = 0;
+    do {
+      count++;
+    } while (scanner.next());
+    assertTrue(count > 0);
+    reader.close();
+    return count;
+  }
+
+  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
+    Integer value = map.containsKey(first) ? map.get(first) : 0;
+    map.put(first, value + 1);
+
+    value = map.containsKey(last) ? map.get(last) : 0;
+    map.put(last, value - 1);
+  }
+
+  @Test(timeout = 120000)
+  public void testInferBoundaries() {
+    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+    /*
+     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
+     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
+     * u---------x The output should be (m,r,u)
+     */
+
+    String first;
+    String last;
+
+    first = "a";
+    last = "e";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "r";
+    last = "s";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "o";
+    last = "p";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "g";
+    last = "k";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "v";
+    last = "x";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "c";
+    last = "i";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "m";
+    last = "q";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "s";
+    last = "t";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "u";
+    last = "w";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
+    byte[][] compare = new byte[3][];
+    compare[0] = "m".getBytes();
+    compare[1] = "r".getBytes();
+    compare[2] = "u".getBytes();
+
+    assertEquals(keysArray.length, 3);
+
+    for (int row = 0; row < keysArray.length; row++) {
+      assertArrayEquals(keysArray[row], compare[row]);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testLoadTooMayHFiles() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+    byte[] from = Bytes.toBytes("begin");
+    byte[] to = Bytes.toBytes("end");
+    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
+        FAMILY, QUALIFIER, from, to, 1000);
+    }
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
+    try {
+      loader.run(args);
+      fail("Bulk loading too many files should fail");
+    } catch (IOException ie) {
+      assertTrue(ie.getMessage()
+          .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
+    }
+  }
+
+  @Test(expected = TableNotFoundException.class)
+  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
+    Configuration conf = util.getConfiguration();
+    conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    String[] args = { "directory", "nonExistingTable" };
+    loader.run(args);
+  }
+
+  @Test(timeout = 120000)
+  public void testTableWithCFNameStartWithUnderScore() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    String family = "_cf";
+    Path familyDir = new Path(dir, family);
+
+    byte[] from = Bytes.toBytes("begin");
+    byte[] to = Bytes.toBytes("end");
+    Configuration conf = util.getConfiguration();
+    String tableName = tn.getMethodName();
+    Table table = util.createTable(TableName.valueOf(tableName), family);
+    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
+      QUALIFIER, from, to, 1000);
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    String[] args = { dir.toString(), tableName };
+    try {
+      loader.run(args);
+      assertEquals(1000, util.countRows(table));
+    } finally {
+      if (null != table) {
+        table.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..414a6cb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestLoadIncrementalHFilesSplitRecovery {
+  private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+  static HBaseTestingUtility util;
+  // used by secure subclass
+  static boolean useSecure = false;
+
+  final static int NUM_CFS = 10;
+  final static byte[] QUAL = Bytes.toBytes("qual");
+  final static int ROWCOUNT = 100;
+
+  private final static byte[][] families = new byte[NUM_CFS][];
+
+  @Rule
+  public TestName name = new TestName();
+
+  static {
+    for (int i = 0; i < NUM_CFS; i++) {
+      families[i] = Bytes.toBytes(family(i));
+    }
+  }
+
+  static byte[] rowkey(int i) {
+    return Bytes.toBytes(String.format("row_%08d", i));
+  }
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  static byte[] value(int i) {
+    return Bytes.toBytes(String.format("%010d", i));
+  }
+
+  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
+    byte[] val = value(value);
+    for (int i = 0; i < NUM_CFS; i++) {
+      Path testIn = new Path(dir, family(i));
+
+      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+    }
+  }
+
+  private TableDescriptor createTableDesc(TableName name, int cfs) {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
+    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
+        .forEachOrdered(builder::addColumnFamily);
+    return builder.build();
+  }
+
+  /**
+   * Creates a table with given table name and specified number of column families if the table does
+   * not already exist.
+   */
+  private void setupTable(final Connection connection, TableName table, int cfs)
+      throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      try (Admin admin = connection.getAdmin()) {
+        admin.createTable(createTableDesc(table, cfs));
+      }
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  /**
+   * Creates a table with given table name,specified number of column families<br>
+   * and splitkeys if the table does not already exist.
+   * @param table
+   * @param cfs
+   * @param SPLIT_KEYS
+   */
+  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
+      throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  private Path buildBulkFiles(TableName table, int value) throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
+    Path bulk1 = new Path(dir, table.getNameAsString() + value);
+    FileSystem fs = util.getTestFileSystem();
+    buildHFiles(fs, bulk1, value);
+    return bulk1;
+  }
+
+  /**
+   * Populate table with known values.
+   */
+  private void populateTable(final Connection connection, TableName table, int value)
+      throws Exception {
+    // create HFiles for different column families
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+    Path bulk1 = buildBulkFiles(table, value);
+    try (Table t = connection.getTable(table);
+        RegionLocator locator = connection.getRegionLocator(table);
+        Admin admin = connection.getAdmin()) {
+      lih.doBulkLoad(bulk1, admin, t, locator);
+    }
+  }
+
+  /**
+   * Split the known table in half. (this is hard coded for this test suite)
+   */
+  private void forceSplit(TableName table) {
+    try {
+      // need to call regions server to by synchronous but isn't visible.
+      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
+
+      for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+        if (hri.getTable().equals(table)) {
+          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
+          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
+        }
+      }
+
+      // verify that split completed.
+      int regions;
+      do {
+        regions = 0;
+        for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+          if (hri.getTable().equals(table)) {
+            regions++;
+          }
+        }
+        if (regions != 2) {
+          LOG.info("Taking some time to complete split...");
+          Thread.sleep(250);
+        }
+      } while (regions != 2);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    util = new HBaseTestingUtility();
+    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    util.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  /**
+   * Checks that all columns have the expected value and that there is the expected number of rows.
+   * @throws IOException
+   */
+  void assertExpectedTable(TableName table, int count, int value) throws IOException {
+    List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
+    assertEquals(htds.size(), 1);
+    try (Table t = util.getConnection().getTable(table);
+        ResultScanner sr = t.getScanner(new Scan())) {
+      int i = 0;
+      for (Result r; (r = sr.next()) != null;) {
+        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+            .forEach(v -> assertArrayEquals(value(value), v));
+        i++;
+      }
+      assertEquals(count, i);
+    } catch (IOException e) {
+      fail("Failed due to exception");
+    }
+  }
+
+  /**
+   * Test that shows that exception thrown from the RS side will result in an exception on the
+   * LIHFile client.
+   */
+  @Test(expected = IOException.class, timeout = 120000)
+  public void testBulkLoadPhaseFailure() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    final AtomicInteger attmptedCalls = new AtomicInteger();
+    final AtomicInteger failedCalls = new AtomicInteger();
+    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, table, 10);
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+        @Override
+        protected List<LoadQueueItem> tryAtomicRegionLoad(
+            ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
+            Collection<LoadQueueItem> lqis) throws IOException {
+          int i = attmptedCalls.incrementAndGet();
+          if (i == 1) {
+            Connection errConn;
+            try {
+              errConn = getMockedConnection(util.getConfiguration());
+              serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
+            } catch (Exception e) {
+              LOG.fatal("mocking cruft, should never happen", e);
+              throw new RuntimeException("mocking cruft, should never happen");
+            }
+            failedCalls.incrementAndGet();
+            return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+          }
+
+          return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+        }
+      };
+      try {
+        // create HFiles for different column families
+        Path dir = buildBulkFiles(table, 1);
+        try (Table t = connection.getTable(table);
+            RegionLocator locator = connection.getRegionLocator(table);
+            Admin admin = connection.getAdmin()) {
+          lih.doBulkLoad(dir, admin, t, locator);
+        }
+      } finally {
+        util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+      }
+      fail("doBulkLoad should have thrown an exception");
+    }
+  }
+
+  /**
+   * Test that shows that exception thrown from the RS side will result in the expected number of
+   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
+   * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
+   */
+  @Test
+  public void testRetryOnIOException() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    final AtomicInteger calls = new AtomicInteger(1);
+    final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
+    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
+    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+      @Override
+      protected List<LoadQueueItem> tryAtomicRegionLoad(
+          ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
+          Collection<LoadQueueItem> lqis) throws IOException {
+        if (calls.getAndIncrement() < util.getConfiguration().getInt(
+          HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
+            1) {
+          ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
+              tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
+              HConstants.PRIORITY_UNSET) {
+            @Override
+            public byte[] rpcCall() throws Exception {
+              throw new IOException("Error calling something on RegionServer");
+            }
+          };
+          return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
+        } else {
+          return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
+        }
+      }
+    };
+    setupTable(conn, table, 10);
+    Path dir = buildBulkFiles(table, 1);
+    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
+    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
+
+  }
+
+  private ClusterConnection getMockedConnection(final Configuration conf)
+      throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
+    ClusterConnection c = Mockito.mock(ClusterConnection.class);
+    Mockito.when(c.getConfiguration()).thenReturn(conf);
+    Mockito.doNothing().when(c).close();
+    // Make it so we return a particular location when asked.
+    final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+        ServerName.valueOf("example.org", 1234, 0));
+    Mockito.when(
+      c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
+        .thenReturn(loc);
+    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
+    ClientProtos.ClientService.BlockingInterface hri =
+        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
+    Mockito
+        .when(
+          hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
+        .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
+    Mockito.when(c.getClient(Mockito.any(ServerName.class))).thenReturn(hri);
+    return c;
+  }
+
+  /**
+   * This test exercises the path where there is a split after initial validation but before the
+   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
+   * split just before the atomic region load.
+   */
+  @Test(timeout = 120000)
+  public void testSplitWhileBulkLoadPhase() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, table, 10);
+      populateTable(connection, table, 1);
+      assertExpectedTable(table, ROWCOUNT, 1);
+
+      // Now let's cause trouble. This will occur after checks and cause bulk
+      // files to fail when attempt to atomically import. This is recoverable.
+      final AtomicInteger attemptedCalls = new AtomicInteger();
+      LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
+        @Override
+        protected void bulkLoadPhase(final Table htable, final Connection conn,
+            ExecutorService pool, Deque<LoadQueueItem> queue,
+            final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
+            Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+          int i = attemptedCalls.incrementAndGet();
+          if (i == 1) {
+            // On first attempt force a split.
+            forceSplit(table);
+          }
+          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+        }
+      };
+
+      // create HFiles for different column families
+      try (Table t = connection.getTable(table);
+          RegionLocator locator = connection.getRegionLocator(table);
+          Admin admin = connection.getAdmin()) {
+        Path bulk = buildBulkFiles(table, 2);
+        lih2.doBulkLoad(bulk, admin, t, locator);
+      }
+
+      // check that data was loaded
+      // The three expected attempts are 1) failure because need to split, 2)
+      // load of split top 3) load of split bottom
+      assertEquals(attemptedCalls.get(), 3);
+      assertExpectedTable(table, ROWCOUNT, 2);
+    }
+  }
+
+  /**
+   * This test splits a table and attempts to bulk load. The bulk import files should be split
+   * before atomically importing.
+   */
+  @Test(timeout = 120000)
+  public void testGroupOrSplitPresplit() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, table, 10);
+      populateTable(connection, table, 1);
+      assertExpectedTable(connection, table, ROWCOUNT, 1);
+      forceSplit(table);
+
+      final AtomicInteger countedLqis = new AtomicInteger();
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+        @Override
+        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+            final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+          Pair<List<LoadQueueItem>, String> lqis =
+              super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+          if (lqis != null && lqis.getFirst() != null) {
+            countedLqis.addAndGet(lqis.getFirst().size());
+          }
+          return lqis;
+        }
+      };
+
+      // create HFiles for different column families
+      Path bulk = buildBulkFiles(table, 2);
+      try (Table t = connection.getTable(table);
+          RegionLocator locator = connection.getRegionLocator(table);
+          Admin admin = connection.getAdmin()) {
+        lih.doBulkLoad(bulk, admin, t, locator);
+      }
+      assertExpectedTable(connection, table, ROWCOUNT, 2);
+      assertEquals(20, countedLqis.get());
+    }
+  }
+
+  /**
+   * This test creates a table with many small regions. The bulk load files would be splitted
+   * multiple times before all of them can be loaded successfully.
+   */
+  @Test(timeout = 120000)
+  public void testSplitTmpFileCleanUp() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
+        Bytes.toBytes("row_00000050") };
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
+
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+
+      // create HFiles
+      Path bulk = buildBulkFiles(table, 2);
+      try (Table t = connection.getTable(table);
+          RegionLocator locator = connection.getRegionLocator(table);
+          Admin admin = connection.getAdmin()) {
+        lih.doBulkLoad(bulk, admin, t, locator);
+      }
+      // family path
+      Path tmpPath = new Path(bulk, family(0));
+      // TMP_DIR under family path
+      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
+      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
+      // HFiles have been splitted, there is TMP_DIR
+      assertTrue(fs.exists(tmpPath));
+      // TMP_DIR should have been cleaned-up
+      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
+        FSUtils.listStatus(fs, tmpPath));
+      assertExpectedTable(connection, table, ROWCOUNT, 2);
+    }
+  }
+
+  /**
+   * This simulates an remote exception which should cause LIHF to exit with an exception.
+   */
+  @Test(expected = IOException.class, timeout = 120000)
+  public void testGroupOrSplitFailure() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, tableName, 10);
+
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+        int i = 0;
+
+        @Override
+        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+            final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+          i++;
+
+          if (i == 5) {
+            throw new IOException("failure");
+          }
+          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+        }
+      };
+
+      // create HFiles for different column families
+      Path dir = buildBulkFiles(tableName, 1);
+      try (Table t = connection.getTable(tableName);
+          RegionLocator locator = connection.getRegionLocator(tableName);
+          Admin admin = connection.getAdmin()) {
+        lih.doBulkLoad(dir, admin, t, locator);
+      }
+    }
+
+    fail("doBulkLoad should have thrown an exception");
+  }
+
+  @Test(timeout = 120000)
+  public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
+    // Share connection. We were failing to find the table with our new reverse scan because it
+    // looks for first region, not any region -- that is how it works now. The below removes first
+    // region in test. Was reliant on the Connection caching having first region.
+    Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
+    Table table = connection.getTable(tableName);
+
+    setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
+    Path dir = buildBulkFiles(tableName, 2);
+
+    final AtomicInteger countedLqis = new AtomicInteger();
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
+
+      @Override
+      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+          Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+          final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+        Pair<List<LoadQueueItem>, String> lqis =
+            super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+        if (lqis != null && lqis.getFirst() != null) {
+          countedLqis.addAndGet(lqis.getFirst().size());
+        }
+        return lqis;
+      }
+    };
+
+    // do bulkload when there is no region hole in hbase:meta.
+    try (Table t = connection.getTable(tableName);
+        RegionLocator locator = connection.getRegionLocator(tableName);
+        Admin admin = connection.getAdmin()) {
+      loader.doBulkLoad(dir, admin, t, locator);
+    } catch (Exception e) {
+      LOG.error("exeception=", e);
+    }
+    // check if all the data are loaded into the table.
+    this.assertExpectedTable(tableName, ROWCOUNT, 2);
+
+    dir = buildBulkFiles(tableName, 3);
+
+    // Mess it up by leaving a hole in the hbase:meta
+    List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+    for (HRegionInfo regionInfo : regionInfos) {
+      if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+        MetaTableAccessor.deleteRegion(connection, regionInfo);
+        break;
+      }
+    }
+
+    try (Table t = connection.getTable(tableName);
+        RegionLocator locator = connection.getRegionLocator(tableName);
+        Admin admin = connection.getAdmin()) {
+      loader.doBulkLoad(dir, admin, t, locator);
+    } catch (Exception e) {
+      LOG.error("exception=", e);
+      assertTrue("IOException expected", e instanceof IOException);
+    }
+
+    table.close();
+
+    // Make sure at least the one region that still exists can be found.
+    regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+    assertTrue(regionInfos.size() >= 1);
+
+    this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
+    connection.close();
+  }
+
+  /**
+   * Checks that all columns have the expected value and that there is the expected number of rows.
+   * @throws IOException
+   */
+  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
+      throws IOException {
+    List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
+    assertEquals(htds.size(), 1);
+    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
+      int i = 0;
+      for (Result r; (r = sr.next()) != null;) {
+        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+            .forEach(v -> assertArrayEquals(value(value), v));
+        i++;
+      }
+      assertEquals(count, i);
+    } catch (IOException e) {
+      fail("Failed due to exception");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
new file mode 100644
index 0000000..3d4f4c6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. This suite is unable
+ * to verify the security handoff/turnover as miniCluster is running as system user thus has root
+ * privileges and delegation tokens don't seem to work on miniDFS.
+ * <p>
+ * Thus SecureBulkload can only be completely verified by running integration tests against a secure
+ * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(util.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+    // setup configuration
+    SecureTestUtil.enableSecurity(util.getConfiguration());
+    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+      MAX_FILES_PER_REGION_PER_FAMILY);
+    // change default behavior so that tag values are returned with normal rpcs
+    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+      KeyValueCodecWithTags.class.getCanonicalName());
+
+    util.startMiniCluster();
+
+    // Wait for the ACL table to become available
+    util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+
+    setupNamespace();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..58fea9d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestSecureLoadIncrementalHFilesSplitRecovery using LoadIncrementalHFiles in secure mode.
+ * This suite is unable to verify the security handoff/turnove as miniCluster is running as system
+ * user thus has root privileges and delegation tokens don't seem to work on miniDFS.
+ * <p>
+ * Thus SecureBulkload can only be completely verified by running integration tests against a secure
+ * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestSecureLoadIncrementalHFilesSplitRecovery
+    extends TestLoadIncrementalHFilesSplitRecovery {
+
+  // This "overrides" the parent static method
+  // make sure they are in sync
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    util = new HBaseTestingUtility();
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(util.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+    // setup configuration
+    SecureTestUtil.enableSecurity(util.getConfiguration());
+
+    util.startMiniCluster();
+
+    // Wait for the ACL table to become available
+    util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+  }
+
+  // Disabling this test as it does not work in secure mode
+  @Test(timeout = 180000)
+  @Override
+  public void testBulkLoadPhaseFailure() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
index f45c0b9..33fbb68 100644
--- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
+++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
 import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
index 040546d..2adba32 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
@@ -41,7 +41,7 @@ import java.util.List;
  *  path/to/hbase-spark.jar {path/to/output/HFiles}
  *
  * This example will output put hfiles in {path/to/output/HFiles}, and user can run
- * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
+ * 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
  */
 final public class JavaHBaseBulkLoadExample {
   private JavaHBaseBulkLoadExample() {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index bfacbe8..e383b5e 100644
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index d2b707e..a427327 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.spark
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
 import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
 import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index f96cd6c..6f7f9e0 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -577,7 +577,7 @@ There are two ways to invoke this utility, with explicit classname and via the d
 
 .Explicit Classname
 ----
-$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
+$ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
 ----
 
 .Driver


Mime
View raw message