hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-14777 Fix Inter Cluster Replication Future ordering issues
Date Thu, 19 Nov 2015 00:45:26 GMT
Repository: hbase
Updated Branches:
  refs/heads/master b2c20cebb -> c8fbaf0c9


HBASE-14777 Fix Inter Cluster Replication Future ordering issues

Signed-off-by: Elliott Clark <eclark@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c8fbaf0c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c8fbaf0c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c8fbaf0c

Branch: refs/heads/master
Commit: c8fbaf0c965b76f765e7230e51485add3c73df48
Parents: b2c20ce
Author: Ashu Pachauri <ashu210890@gmail.com>
Authored: Tue Nov 17 16:45:14 2015 -0800
Committer: Elliott Clark <eclark@apache.org>
Committed: Wed Nov 18 16:38:41 2015 -0800

----------------------------------------------------------------------
 .../HBaseInterClusterReplicationEndpoint.java   |  16 ++-
 .../hbase/replication/TestReplicationBase.java  |  13 ++-
 ...estReplicationChangingPeerRegionservers.java |   5 +-
 .../replication/TestReplicationEndpoint.java    | 101 ++++++++++++++++++-
 4 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c8fbaf0c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 4c719a9..624ded6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -30,6 +30,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -202,15 +203,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
                   " entries of total size " + replicateContext.getSize());
             }
             // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-            futures.add(exec.submit(new Replicator(entryLists.get(i), i)));
+            futures.add(exec.submit(createReplicator(entryLists.get(i), i)));
           }
         }
         IOException iox = null;
-        for (Future<Integer> f : futures) {
+        for (int index = futures.size() - 1; index >= 0; index--) {
           try {
             // wait for all futures, remove successful parts
             // (only the remaining parts will be retried)
-            entryLists.remove(f.get());
+            Future<Integer> f = futures.get(index);
+            entryLists.remove(f.get().intValue());
           } catch (InterruptedException ie) {
             iox =  new IOException(ie);
           } catch (ExecutionException ee) {
@@ -289,7 +291,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return super.stopAndWait();
   }
 
-  private class Replicator implements Callable<Integer> {
+  @VisibleForTesting
+  protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+    return new Replicator(entries, ordinal);
+  }
+
+  @VisibleForTesting
+  protected class Replicator implements Callable<Integer> {
     private List<Entry> entries;
     private int ordinal;
     public Replicator(List<Entry> entries, int ordinal) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8fbaf0c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index ad9b227..ac87269 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -83,9 +83,11 @@ public class TestReplicationBase {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
-    // smaller log roll size to trigger more events
-    conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
-    conf1.setInt("replication.source.size.capacity", 10240);
+    // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
+    // sufficient number of events. But we don't want to go too low because
+    // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
+    // more than one batch sent to the peer cluster for better testing.
+    conf1.setInt("replication.source.size.capacity", 102400);
     conf1.setLong("replication.source.sleepforretries", 100);
     conf1.setInt("hbase.regionserver.maxlogs", 10);
     conf1.setLong("hbase.master.logcleaner.ttl", 10);
@@ -98,6 +100,7 @@ public class TestReplicationBase {
     conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf1.setLong("replication.sleep.before.failover", 2000);
     conf1.setInt("replication.source.maxretriesmultiplier", 10);
+    conf1.setFloat("replication.source.ratio", 1.0f);
 
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();
@@ -126,7 +129,9 @@ public class TestReplicationBase {
     LOG.info("Setup second Zk");
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
     utility1.startMiniCluster(2);
-    utility2.startMiniCluster(2);
+    // Have a bunch of slave servers, because inter-cluster shipping logic uses number of
sinks
+    // as a component in deciding maximum number of parallel batches to send to the peer
cluster.
+    utility2.startMiniCluster(4);
 
     HTableDescriptor table = new HTableDescriptor(tableName);
     HColumnDescriptor fam = new HColumnDescriptor(famName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8fbaf0c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
index ba2a7c1..53aabfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
@@ -92,6 +92,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
 
     LOG.info("testSimplePutDelete");
     MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
+    int numRS = peerCluster.getRegionServerThreads().size();
 
     doPutTest(Bytes.toBytes(1));
 
@@ -100,14 +101,14 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
     peerCluster.waitOnRegionServer(rsToStop);
 
     // Sanity check
-    assertEquals(1, peerCluster.getRegionServerThreads().size());
+    assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
 
     doPutTest(Bytes.toBytes(2));
 
     peerCluster.startRegionServer();
 
     // Sanity check
-    assertEquals(2, peerCluster.getRegionServerThreads().size());
+    assertEquals(numRS, peerCluster.getRegionServerThreads().size());
 
     doPutTest(Bytes.toBytes(3));
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c8fbaf0c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 66adf70..bf0cc1a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -29,11 +29,14 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -60,7 +63,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TestReplicationBase.setUpBeforeClass();
-    utility2.shutdownMiniCluster(); // we don't need the second cluster
     admin.removePeer("2");
     numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
   }
@@ -185,6 +187,49 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
   @Test (timeout=120000)
+  public void testInterClusterReplication() throws Exception {
+    final String id = "testInterClusterReplication";
+
+    List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
+    int totEdits = 0;
+
+    // Make sure edits are spread across regions because we do region based batching
+    // before shipping edits.
+    for(HRegion region: regions) {
+      HRegionInfo hri = region.getRegionInfo();
+      byte[] row = hri.getStartKey();
+      for (int i = 0; i < 100; i++) {
+        if (row.length > 0) {
+          doPut(row);
+          totEdits++;
+        }
+      }
+    }
+
+    admin.addPeer(id,
+        new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2))
+            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
+        null);
+
+    final int numEdits = totEdits;
+    Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
+      }
+      @Override
+      public String explainFailure() throws Exception {
+        String failure = "Failed to replicate all edits, expected = " + numEdits
+            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
+        return failure;
+      }
+    });
+
+    admin.removePeer("testInterClusterReplication");
+    utility1.deleteTableData(tableName);
+  }
+
+  @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
       new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
@@ -270,6 +315,60 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
   }
 
+  public static class InterClusterReplicationEndpointForTest
+      extends HBaseInterClusterReplicationEndpoint {
+
+    static AtomicInteger replicateCount = new AtomicInteger();
+    static boolean failedOnce;
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      boolean success = super.replicate(replicateContext);
+      if (success) {
+        replicateCount.addAndGet(replicateContext.entries.size());
+      }
+      return success;
+    }
+
+    @Override
+    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+      // Fail only once, we don't want to slow down the test.
+      if (failedOnce) {
+        return new DummyReplicator(entries, ordinal);
+      } else {
+        failedOnce = true;
+        return new FailingDummyReplicator(entries, ordinal);
+      }
+    }
+
+    protected class DummyReplicator extends Replicator {
+
+      private int ordinal;
+
+      public DummyReplicator(List<Entry> entries, int ordinal) {
+        super(entries, ordinal);
+        this.ordinal = ordinal;
+      }
+
+      @Override
+      public Integer call() throws IOException {
+        return ordinal;
+      }
+    }
+
+    protected class FailingDummyReplicator extends DummyReplicator {
+
+      public FailingDummyReplicator(List<Entry> entries, int ordinal) {
+        super(entries, ordinal);
+      }
+
+      @Override
+      public Integer call() throws IOException {
+        throw new IOException("Sample Exception: Failed to replicate.");
+      }
+    }
+  }
+
   public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest
{
     static int COUNT = 10;
     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);


Mime
View raw message