hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [41/49] git commit: HBASE-10818. Add integration test for bulkload with replicas (Nick Dimiduk and Devaraj Das)
Date Sat, 28 Jun 2014 00:31:27 GMT
HBASE-10818. Add integration test for bulkload with replicas (Nick Dimiduk and Devaraj Das)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5ec49b1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5ec49b1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5ec49b1

Branch: refs/heads/master
Commit: e5ec49b123c4070355182f04397d8c8c347c9aff
Parents: 1daf8ac
Author: Devaraj Das <ddas@Devaraj-Dass-MacBook-Pro-2.local>
Authored: Fri May 23 17:59:05 2014 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Jun 27 16:39:40 2014 -0700

----------------------------------------------------------------------
 .../hbase/IntegrationTestRegionReplicaPerf.java |  39 +-----
 .../mapreduce/IntegrationTestBulkLoad.java      | 127 +++++++++++++++----
 ...stTimeBoundedRequestsWithRegionReplicas.java |   5 +
 .../hbase/mapreduce/TableRecordReaderImpl.java  |   9 +-
 .../mapreduce/TableSnapshotInputFormat.java     |   2 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  39 ++++++
 6 files changed, 158 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec49b1/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
index ca3a8f0..8ea27bf 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.chaos.policies.Policy;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
@@ -222,42 +221,6 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase
{
     return null;
   }
 
-  /**
-   * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
-   */
-  private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception
{
-    admin.modifyTable(desc.getTableName(), desc);
-    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
-      setFirst(0);
-      setSecond(0);
-    }};
-    for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500
seconds
-      status = admin.getAlterStatus(desc.getTableName());
-      if (status.getSecond() != 0) {
-        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
-          + " regions updated.");
-        Thread.sleep(1 * 1000l);
-      } else {
-        LOG.debug("All regions updated.");
-      }
-    }
-    if (status.getSecond() != 0) {
-      throw new Exception("Failed to update replica count after 500 seconds.");
-    }
-  }
-
-  /**
-   * Set the number of Region replicas.
-   */
-  private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount)
-      throws Exception {
-    admin.disableTable(table);
-    HTableDescriptor desc = admin.getTableDescriptor(table);
-    desc.setRegionReplication(replicaCount);
-    modifyTableSync(admin, desc);
-    admin.enableTable(table);
-  }
-
   public void test() throws Exception {
     int maxIters = 3;
     String mr = nomapred ? "--nomapred" : "";
@@ -294,7 +257,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase
{
     // disable monkey, enable region replicas, enable monkey
     cleanUpMonkey("Altering table.");
     LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
-    setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
+    util.setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
     setUpMonkey();
     startMonkey();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec49b1/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
index dd4415b..4112014 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -28,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.lang.RandomStringUtils;
@@ -38,14 +37,25 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.IntegrationTestBase;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.IntegrationTests;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -69,6 +79,9 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test Bulk Load and MR on a distributed cluster.
  * It starts an MR job that creates linked chains
@@ -99,15 +112,17 @@ import org.junit.experimental.categories.Category;
  * hbase.IntegrationTestBulkLoad.tableName
  * The name of the table.
  *
+ * hbase.IntegrationTestBulkLoad.replicaCount
+ * How many region replicas to configure for the table under test.
  */
 @Category(IntegrationTests.class)
 public class IntegrationTestBulkLoad extends IntegrationTestBase {
 
   private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
 
-  private static byte[] CHAIN_FAM = Bytes.toBytes("L");
-  private static byte[] SORT_FAM  = Bytes.toBytes("S");
-  private static byte[] DATA_FAM  = Bytes.toBytes("D");
+  private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
+  private static final byte[] SORT_FAM  = Bytes.toBytes("S");
+  private static final byte[] DATA_FAM  = Bytes.toBytes("D");
 
   private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
   private static int CHAIN_LENGTH = 500000;
@@ -123,9 +138,73 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
   private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
   private static String TABLE_NAME = "IntegrationTestBulkLoad";
 
+  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
+  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
+
+  private static final String OPT_LOAD = "load";
+  private static final String OPT_CHECK = "check";
+
+  private boolean load = false;
+  private boolean check = false;
+
+  public static class SlowMeCoproScanOperations extends BaseRegionObserver {
+    static final AtomicLong sleepTime = new AtomicLong(2000);
+    Random r = new Random();
+    AtomicLong countOfNext = new AtomicLong(0); 
+    AtomicLong countOfOpen = new AtomicLong(0); 
+    public SlowMeCoproScanOperations() {}
+    @Override
+    public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment>
e,
+        final Scan scan, final RegionScanner s) throws IOException {
+      if (countOfOpen.incrementAndGet() % 4 == 0) { //slowdown openScanner randomly
+        slowdownCode(e);
+      }
+      return s;
+    }
+
+    @Override
+    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment>
e,
+        final InternalScanner s, final List<Result> results,
+        final int limit, final boolean hasMore) throws IOException {
+      //this will slow down a certain next operation if the conditions are met. The slowness
+      //will allow the call to go to a replica
+      if (countOfNext.incrementAndGet() % 4 == 0) {
+        slowdownCode(e);
+      }
+      return true;
+    }
+    protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment>
e) {
+      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
+        try {
+          if (sleepTime.get() > 0) {
+            LOG.info("Sleeping for " + sleepTime.get() + " ms");
+            Thread.sleep(sleepTime.get());
+          }
+        } catch (InterruptedException e1) {
+          LOG.error(e1);
+        }
+      } 
+    }
+  }
+
+  /**
+   * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
+   */
+  private void installSlowingCoproc() throws IOException, InterruptedException {
+    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
+    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
+
+    TableName t = TableName.valueOf(getTablename());
+    HBaseAdmin admin = util.getHBaseAdmin();
+    HTableDescriptor desc = admin.getTableDescriptor(t);
+    desc.addCoprocessor(SlowMeCoproScanOperations.class.getName());
+    HBaseTestingUtility.modifyTableSync(admin, desc);
+  }
+
   @Test
   public void testBulkLoad() throws Exception {
     runLoad();
+    installSlowingCoproc();
     runCheck();
   }
 
@@ -145,7 +224,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
     return split.split(numRegions);
   }
 
-  private void setupTable() throws IOException {
+  private void setupTable() throws IOException, InterruptedException {
     if (util.getHBaseAdmin().tableExists(getTablename())) {
       util.deleteTable(getTablename());
     }
@@ -155,6 +234,12 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
         new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
         getSplits(16)
     );
+
+    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
+    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
+
+    TableName t = TableName.valueOf(getTablename());
+    HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount);
   }
 
   private void runLinkedListMRJob(int iteration) throws Exception {
@@ -556,23 +641,23 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
     Path p = util.getDataTestDirOnTestFS(jobName);
 
     Job job = new Job(conf);
-
     job.setJarByClass(getClass());
+    job.setJobName(jobName);
 
     job.setPartitionerClass(NaturalKeyPartitioner.class);
     job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
     job.setSortComparatorClass(CompositeKeyComparator.class);
 
-    Scan s = new Scan();
-    s.addFamily(CHAIN_FAM);
-    s.addFamily(SORT_FAM);
-    s.setMaxVersions(1);
-    s.setCacheBlocks(false);
-    s.setBatch(1000);
+    Scan scan = new Scan();
+    scan.addFamily(CHAIN_FAM);
+    scan.addFamily(SORT_FAM);
+    scan.setMaxVersions(1);
+    scan.setCacheBlocks(false);
+    scan.setBatch(1000);
 
     TableMapReduceUtil.initTableMapperJob(
         Bytes.toBytes(getTablename()),
-        new Scan(),
+        scan,
         LinkedListCheckingMapper.class,
         LinkKey.class,
         LinkChain.class,
@@ -595,6 +680,10 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
   public void setUpCluster() throws Exception {
     util = getTestingUtil(getConf());
     util.initializeCluster(1);
+    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
+    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
+      LOG.debug("Region Replicas enabled: " + replicaCount);
+    }
 
     // Scale this up on a real cluster
     if (util.isDistributedCluster()) {
@@ -607,12 +696,6 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
     }
   }
 
-  private static final String OPT_LOAD = "load";
-  private static final String OPT_CHECK = "check";
-
-  private boolean load = false;
-  private boolean check = false;
-
   @Override
   protected void addOptions() {
     super.addOptions();
@@ -632,6 +715,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
     if (load) {
       runLoad();
     } else if (check) {
+      installSlowingCoproc();
       runCheck();
     } else {
       testBulkLoad();
@@ -655,5 +739,4 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
     int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
     System.exit(status);
   }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec49b1/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
index 66f3155..eb3bb70 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java
@@ -234,6 +234,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends
Integr
     protected AtomicLong timedOutReads = new AtomicLong();
     protected long runTime;
     protected Thread timeoutThread;
+    protected AtomicLong staleReads = new AtomicLong();
 
     public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
         TableName tableName, double verifyPercent) throws IOException {
@@ -263,6 +264,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends
Integr
     @Override
     protected String progressInfo() {
       StringBuilder builder = new StringBuilder(super.progressInfo());
+      appendToStatus(builder, "stale_reads", staleReads.get());
       appendToStatus(builder, "get_timeouts", timedOutReads.get());
       return builder.toString();
     }
@@ -327,6 +329,9 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends
Integr
           Result[] results, HTableInterface table, boolean isNullExpected)
           throws IOException {
         super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
+        for (Result r : results) {
+          if (r.isStale()) staleReads.incrementAndGet();
+        }
         // we actually do not timeout and cancel the reads after timeout. We just wait for
the RPC
         // to complete, but if the request took longer than timeout, we treat that as error.
         if (elapsedNano > timeoutNano) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec49b1/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
index 7eb7871..e8e6e8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
@@ -65,6 +65,7 @@ public class TableRecordReaderImpl {
   private TaskAttemptContext context = null;
   private Method getCounter = null;
   private long numRestarts = 0;
+  private long numStale = 0;
   private long timestamp;
   private int rowcount;
   private boolean logScannerActivity = false;
@@ -203,6 +204,7 @@ public class TableRecordReaderImpl {
     try {
       try {
         value = this.scanner.next();
+        if (value != null && value.isStale()) numStale++;
         if (logScannerActivity) {
           rowcount ++;
           if (rowcount >= logPerRowCount) {
@@ -230,6 +232,7 @@ public class TableRecordReaderImpl {
           scanner.next();    // skip presumed already mapped row
         }
         value = scanner.next();
+        if (value != null && value.isStale()) numStale++;
         numRestarts++;
       }
       if (value != null && value.size() > 0) {
@@ -270,11 +273,11 @@ public class TableRecordReaderImpl {
 
     ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
 
-    updateCounters(scanMetrics, numRestarts, getCounter, context);
+    updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
   }
 
   protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
-      Method getCounter, TaskAttemptContext context) {
+      Method getCounter, TaskAttemptContext context, long numStale) {
     // we can get access to counters only if hbase uses new mapreduce APIs
     if (getCounter == null) {
       return;
@@ -289,6 +292,8 @@ public class TableRecordReaderImpl {
       }
       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
           "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
+      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
+          "NUM_SCAN_RESULTS_STALE")).increment(numStale);
     } catch (Exception e) {
       LOG.debug("can't update counter." + StringUtils.stringifyException(e));
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec49b1/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index f8d4d18..8071c56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -152,7 +152,7 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
       if (result) {
         ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
         if (scanMetrics != null && context != null) {
-          TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+          TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
         }
       }
       return result;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ec49b1/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 79bda27..3b64b73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.Threads;
@@ -1464,6 +1465,44 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility
{
   }
 
   /**
+   * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
+   */
+  public static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc)
+      throws IOException, InterruptedException {
+    admin.modifyTable(desc.getTableName(), desc);
+    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
+      setFirst(0);
+      setSecond(0);
+    }};
+    for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500
seconds
+      status = admin.getAlterStatus(desc.getTableName());
+      if (status.getSecond() != 0) {
+        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+          + " regions updated.");
+        Thread.sleep(1 * 1000l);
+      } else {
+        LOG.debug("All regions updated.");
+        break;
+      }
+    }
+    if (status.getSecond() != 0) {
+      throw new IOException("Failed to update replica count after 500 seconds.");
+    }
+  }
+
+  /**
+   * Set the number of Region replicas.
+   */
+  public static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount)
+      throws IOException, InterruptedException {
+    admin.disableTable(table);
+    HTableDescriptor desc = admin.getTableDescriptor(table);
+    desc.setRegionReplication(replicaCount);
+    modifyTableSync(admin, desc);
+    admin.enableTable(table);
+  }
+
+  /**
    * Drop an existing table
    * @param tableName existing table
    */


Mime
View raw message