hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject [1/3] hbase git commit: HBASE-14777 second addendum, better fix using CompletionService.
Date Thu, 26 Nov 2015 05:20:40 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 06c233834 -> dbbdb7b2e
  refs/heads/branch-1.2 1e654241f -> 89e34f36a
  refs/heads/master e73a9594c -> 6531b465a


HBASE-14777 second addendum, better fix using CompletionService.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6531b465
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6531b465
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6531b465

Branch: refs/heads/master
Commit: 6531b465a757f8f6dad08d924c0a7a88bd52e5a9
Parents: e73a959
Author: Lars Hofhansl <larsh@apache.org>
Authored: Wed Nov 25 21:17:56 2015 -0800
Committer: Lars Hofhansl <larsh@apache.org>
Committed: Wed Nov 25 21:17:56 2015 -0800

----------------------------------------------------------------------
 .../HBaseInterClusterReplicationEndpoint.java       | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6531b465/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 624ded6..7c07ecc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -154,6 +157,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
+    CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
     List<Entry> entries = replicateContext.getEntries();
     String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
@@ -195,7 +199,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
               " entries of total size " + replicateContext.getSize());
         }
 
-        List<Future<Integer>> futures = new ArrayList<Future<Integer>>(entryLists.size());
+        int futures = 0;
         for (int i=0; i<entryLists.size(); i++) {
           if (!entryLists.get(i).isEmpty()) {
             if (LOG.isTraceEnabled()) {
@@ -203,16 +207,18 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
                   " entries of total size " + replicateContext.getSize());
             }
             // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-            futures.add(exec.submit(createReplicator(entryLists.get(i), i)));
+            pool.submit(createReplicator(entryLists.get(i), i));
+            futures++;
           }
         }
         IOException iox = null;
-        for (int index = futures.size() - 1; index >= 0; index--) {
+
+        for (int i=0; i<futures; i++) {
           try {
             // wait for all futures, remove successful parts
             // (only the remaining parts will be retried)
-            Future<Integer> f = futures.get(index);
-            entryLists.remove(f.get().intValue());
+            Future<Integer> f = pool.take();
+            entryLists.set(f.get().intValue(), Collections.<Entry>emptyList());
           } catch (InterruptedException ie) {
             iox =  new IOException(ie);
           } catch (ExecutionException ee) {


Mime
View raw message