hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [14/43] hbase git commit: HBASE-15001 Fix thread-safety issues with replication
Date Sat, 26 Dec 2015 17:07:47 GMT
HBASE-15001 Fix thread-safety issues with replication

ReplicationSinkManager and HBaseInterClusterReplicationEndpoint
perform certain unsafe operations which might lead to undesirable
behavior with multiwal enabled.

Signed-off-by: Elliott Clark <eclark@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48113d75
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48113d75
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48113d75

Branch: refs/heads/hbase-12439
Commit: 48113d7572bbea2f05ae619882d43aa41827f147
Parents: ba04e03
Author: Ashu Pachauri <ashu210890@gmail.com>
Authored: Thu Dec 17 13:25:39 2015 -0800
Committer: Elliott Clark <eclark@apache.org>
Committed: Fri Dec 18 11:48:55 2015 -0800

----------------------------------------------------------------------
 .../HBaseInterClusterReplicationEndpoint.java   | 27 +++++++++++++++-----
 .../regionserver/ReplicationSinkManager.java    | 21 ++++++++++-----
 .../TestReplicationSinkManager.java             | 26 +++++++++----------
 3 files changed, 49 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/48113d75/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 70cc420..78e3e00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -143,9 +143,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     int sleepMultiplier = 1;
 
     // Connect to peer cluster first, unless we have to stop
-    while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
       replicationSinkMgr.chooseSinks();
-      if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -180,19 +180,24 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     List<Entry> entries = replicateContext.getEntries();
     String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
+    int numReplicated = 0;
 
     if (!peersSelected && this.isRunning()) {
       connectToPeers();
       peersSelected = true;
     }
 
-    if (replicationSinkMgr.getSinks().size() == 0) {
+    int numSinks = replicationSinkMgr.getNumSinks();
+    if (numSinks == 0) {
+      LOG.warn("No replication sinks found, returning without replicating. The source should
retry"
+          + " with the same set of edits.");
       return false;
     }
+
     // minimum of: configured threads, number of 100-waledit batches,
     //  and number of current sinks
-    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
-      replicationSinkMgr.getSinks().size());
+    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+
     List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
     if (n == 1) {
       entryLists.add(entries);
@@ -237,7 +242,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
             // wait for all futures, remove successful parts
             // (only the remaining parts will be retried)
             Future<Integer> f = pool.take();
-            entryLists.set(f.get().intValue(), Collections.<Entry>emptyList());
+            int index = f.get().intValue();
+            int batchSize =  entryLists.get(index).size();
+            entryLists.set(index, Collections.<Entry>emptyList());
+            // Now, we have marked the batch as done replicating, record its size
+            numReplicated += batchSize;
           } catch (InterruptedException ie) {
             iox =  new IOException(ie);
           } catch (ExecutionException ee) {
@@ -249,6 +258,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           // if we had any exceptions, try again
           throw iox;
         }
+        if (numReplicated != entries.size()) {
+          // Something went wrong here and we don't know what, let's just fail and retry.
+          LOG.warn("The number of edits replicated is different from the number received,"
+              + " failing for now.");
+          return false;
+        }
         // update metrics
         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
           walGroupId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/48113d75/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 76fa6c2..0469f9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -105,7 +106,7 @@ public class ReplicationSinkManager {
    *
    * @return a replication sink to replicate to
    */
-  public SinkPeer getReplicationSink() throws IOException {
+  public synchronized SinkPeer getReplicationSink() throws IOException {
     if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty())
{
       LOG.info("Current list of sinks is out of date or empty, updating");
       chooseSinks();
@@ -127,7 +128,7 @@ public class ReplicationSinkManager {
    * @param sinkPeer
    *          The SinkPeer that had a failed replication attempt on it
    */
-  public void reportBadSink(SinkPeer sinkPeer) {
+  public synchronized void reportBadSink(SinkPeer sinkPeer) {
     ServerName serverName = sinkPeer.getServerName();
     int badReportCount = (badReportCounts.containsKey(serverName)
                     ? badReportCounts.get(serverName) : 0) + 1;
@@ -146,11 +147,14 @@ public class ReplicationSinkManager {
    * @param sinkPeer
    *          The SinkPeer that had a failed replication attempt on it
    */
-  public void reportSinkSuccess(SinkPeer sinkPeer) {
+  public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
     badReportCounts.remove(sinkPeer.getServerName());
   }
 
-  void chooseSinks() {
+  /**
+   * Refresh the list of sinks.
+   */
+  public synchronized void chooseSinks() {
     List<ServerName> slaveAddresses = endpoint.getRegionServers();
     Collections.shuffle(slaveAddresses, random);
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
@@ -159,8 +163,13 @@ public class ReplicationSinkManager {
     badReportCounts.clear();
   }
 
-  List<ServerName> getSinks() {
-    return sinks;
+  public synchronized int getNumSinks() {
+    return sinks.size();
+  }
+
+  @VisibleForTesting
+  protected List<ServerName> getSinksForTesting() {
+    return Collections.unmodifiableList(sinks);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/48113d75/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 57c3196..104753a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -67,7 +67,7 @@ public class TestReplicationSinkManager {
 
     sinkManager.chooseSinks();
 
-    assertEquals(2, sinkManager.getSinks().size());
+    assertEquals(2, sinkManager.getNumSinks());
 
   }
 
@@ -81,7 +81,7 @@ public class TestReplicationSinkManager {
 
     sinkManager.chooseSinks();
 
-    assertEquals(1, sinkManager.getSinks().size());
+    assertEquals(1, sinkManager.getNumSinks());
   }
 
   @Test
@@ -93,14 +93,14 @@ public class TestReplicationSinkManager {
 
     sinkManager.chooseSinks();
     // Sanity check
-    assertEquals(1, sinkManager.getSinks().size());
+    assertEquals(1, sinkManager.getNumSinks());
 
     SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
 
     sinkManager.reportBadSink(sinkPeer);
 
     // Just reporting a bad sink once shouldn't have an effect
-    assertEquals(1, sinkManager.getSinks().size());
+    assertEquals(1, sinkManager.getNumSinks());
 
   }
 
@@ -120,9 +120,9 @@ public class TestReplicationSinkManager {
 
     sinkManager.chooseSinks();
     // Sanity check
-    assertEquals(3, sinkManager.getSinks().size());
+    assertEquals(3, sinkManager.getNumSinks());
 
-    ServerName serverName = sinkManager.getSinks().get(0);
+    ServerName serverName = sinkManager.getSinksForTesting().get(0);
 
     SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
 
@@ -133,12 +133,12 @@ public class TestReplicationSinkManager {
 
     // Reporting a bad sink more than the threshold count should remove it
     // from the list of potential sinks
-    assertEquals(2, sinkManager.getSinks().size());
+    assertEquals(2, sinkManager.getNumSinks());
 
     //
     // now try a sink that has some successes
     //
-    serverName = sinkManager.getSinks().get(0);
+    serverName = sinkManager.getSinksForTesting().get(0);
 
     sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
@@ -148,17 +148,17 @@ public class TestReplicationSinkManager {
     sinkManager.reportBadSink(sinkPeer);
 
     // did not remove the sink, since we had one successful try
-    assertEquals(2, sinkManager.getSinks().size());
+    assertEquals(2, sinkManager.getNumSinks());
 
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
       sinkManager.reportBadSink(sinkPeer);
     }
     // still not remove, since the success reset the counter
-    assertEquals(2, sinkManager.getSinks().size());
+    assertEquals(2, sinkManager.getNumSinks());
 
     sinkManager.reportBadSink(sinkPeer);
     // but we exhausted the tries
-    assertEquals(1, sinkManager.getSinks().size());
+    assertEquals(1, sinkManager.getNumSinks());
   }
 
   @Test
@@ -174,7 +174,7 @@ public class TestReplicationSinkManager {
     sinkManager.chooseSinks();
     // Sanity check
 
-    List<ServerName> sinkList = sinkManager.getSinks();
+    List<ServerName> sinkList = sinkManager.getSinksForTesting();
     assertEquals(2, sinkList.size());
 
     ServerName serverNameA = sinkList.get(0);
@@ -190,7 +190,7 @@ public class TestReplicationSinkManager {
 
     // We've gone down to 0 good sinks, so the replication sinks
     // should have been refreshed now
-    assertEquals(2, sinkManager.getSinks().size());
+    assertEquals(2, sinkManager.getNumSinks());
   }
 
 }


Mime
View raw message