hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [hbase] 28/28: HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor service if the connection has been shutown
Date Mon, 03 Jun 2019 09:49:39 GMT
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7e5ce0123c857e7cbec349dfe1482769d348d9b8
Author: zhangduo <zhangduo@apache.org>
AuthorDate: Sun Jun 2 21:54:29 2019 +0800

    HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor
service if the connection has been shutown
---
 .../hadoop/hbase/client/TableOverAsyncTable.java   | 38 ++++++++++++++--------
 1 file changed, 25 insertions(+), 13 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 30e3062..5686b09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -38,11 +38,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -422,20 +425,29 @@ class TableOverAsyncTable implements Table {
     // get regions covered by the row range
     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
     Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (byte[] r : keys) {
-      RegionCoprocessorRpcChannel channel = coprocessorService(r);
-      Future<R> future = pool.submit(new Callable<R>() {
-        @Override
-        public R call() throws Exception {
-          R result = call.call(channel);
-          byte[] region = channel.getLastRegion();
-          if (callback != null) {
-            callback.update(region, r, result);
+    try {
+      for (byte[] r : keys) {
+        RegionCoprocessorRpcChannel channel = coprocessorService(r);
+        Future<R> future = pool.submit(new Callable<R>() {
+          @Override
+          public R call() throws Exception {
+            R result = call.call(channel);
+            byte[] region = channel.getLastRegion();
+            if (callback != null) {
+              callback.update(region, r, result);
+            }
+            return result;
           }
-          return result;
-        }
-      });
-      futures.put(r, future);
+        });
+        futures.put(r, future);
+      }
+    } catch (RejectedExecutionException e) {
+      // maybe the connection has been closed, let's check
+      if (pool.isShutdown()) {
+        throw new DoNotRetryIOException("Connection is closed", e);
+      } else {
+        throw new HBaseIOException("Coprocessor operation is rejected", e);
+      }
     }
     for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
       try {


Mime
View raw message