hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject hbase git commit: HBASE-13618 ReplicationSource is too eager to remove sinks.
Date Wed, 13 May 2015 04:33:13 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 4e0d26ea2 -> 2c02724c7


HBASE-13618 ReplicationSource is too eager to remove sinks.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c02724c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c02724c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c02724c

Branch: refs/heads/0.98
Commit: 2c02724c7b3369eac4a5acfe74de1c5fad8e005f
Parents: 4e0d26e
Author: Lars Hofhansl <larsh@apache.org>
Authored: Tue May 12 21:27:55 2015 -0700
Committer: Lars Hofhansl <larsh@apache.org>
Committed: Tue May 12 21:31:06 2015 -0700

----------------------------------------------------------------------
 .../HBaseInterClusterReplicationEndpoint.java   |  1 +
 .../regionserver/ReplicationSinkManager.java    | 10 +++++++
 .../TestReplicationSinkManager.java             | 30 ++++++++++++++++++--
 3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2c02724c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index ff74fd3..99a6714 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -163,6 +163,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
         // update metrics
         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+        replicationSinkMgr.reportSinkSuccess(sinkPeer);
         return true;
 
       } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c02724c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 28fb50f..220f1df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -140,6 +140,16 @@ public class ReplicationSinkManager {
     }
   }
 
+  /**
+   * Report that a {@code SinkPeer} successfully replicated a chunk of data.
+   *
+   * @param sinkPeer
+   *          The SinkPeer that had a failed replication attempt on it
+   */
+  public void reportSinkSuccess(SinkPeer sinkPeer) {
+    badReportCounts.remove(sinkPeer.getServerName());
+  }
+
   void chooseSinks() {
     List<ServerName> slaveAddresses = endpoint.getRegionServers();
     Collections.shuffle(slaveAddresses, random);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c02724c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 6f66bff..fcde1e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -110,7 +110,7 @@ public class TestReplicationSinkManager {
   @Test
   public void testReportBadSink_PastThreshold() {
     List<ServerName> serverNames = Lists.newArrayList();
-    for (int i = 0; i < 20; i++) {
+    for (int i = 0; i < 30; i++) {
       serverNames.add(mock(ServerName.class));
     }
     when(replicationEndpoint.getRegionServers())
@@ -119,18 +119,44 @@ public class TestReplicationSinkManager {
 
     sinkManager.chooseSinks();
     // Sanity check
-    assertEquals(2, sinkManager.getSinks().size());
+    assertEquals(3, sinkManager.getSinks().size());
 
     ServerName serverName = sinkManager.getSinks().get(0);
 
     SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
 
+    sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       sinkManager.reportBadSink(sinkPeer);
     }
 
     // Reporting a bad sink more than the threshold count should remove it
     // from the list of potential sinks
+    assertEquals(2, sinkManager.getSinks().size());
+
+    //
+    // now try a sink that has some successes
+    //
+    serverName = sinkManager.getSinks().get(0);
+
+    sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
+      sinkManager.reportBadSink(sinkPeer);
+    }
+    sinkManager.reportSinkSuccess(sinkPeer); // one success
+    sinkManager.reportBadSink(sinkPeer);
+
+    // did not remove the sink, since we had one successful try
+    assertEquals(2, sinkManager.getSinks().size());
+
+    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
+      sinkManager.reportBadSink(sinkPeer);
+    }
+    // still not remove, since the success reset the counter
+    assertEquals(2, sinkManager.getSinks().size());
+
+    sinkManager.reportBadSink(sinkPeer);
+    // but we exhausted the tries
     assertEquals(1, sinkManager.getSinks().size());
   }
 


Mime
View raw message