hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject hbase git commit: HBASE-10942. support parallel request cancellation for multi-get (Nicolas Liochon & Devaraj Das)
Date Tue, 03 Feb 2015 07:01:39 GMT
Repository: hbase
Updated Branches:
  refs/heads/master eb351b9ff -> cf5ad96fc


HBASE-10942. support parallel request cancellation for multi-get (Nicolas Liochon & Devaraj
Das)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cf5ad96f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cf5ad96f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cf5ad96f

Branch: refs/heads/master
Commit: cf5ad96fcc2ac02889e8a96a5d99cac071e1f25c
Parents: eb351b9
Author: Devaraj Das <ddas@apache.org>
Authored: Mon Feb 2 23:01:31 2015 -0800
Committer: Devaraj Das <ddas@apache.org>
Committed: Mon Feb 2 23:01:31 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 41 ++++++++++--
 .../hbase/client/MultiServerCallable.java       | 18 +++--
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  7 +-
 .../hadoop/hbase/client/TestReplicasClient.java | 69 ++++++++++++++++++++
 4 files changed, 125 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cf5ad96f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index d00118c..1900a25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -685,21 +686,33 @@ class AsyncProcess {
       private final MultiAction<Row> multiAction;
       private final int numAttempt;
       private final ServerName server;
+      private final Set<MultiServerCallable<Row>> callsInProgress;
 
       private SingleServerRequestRunnable(
-          MultiAction<Row> multiAction, int numAttempt, ServerName server) {
+          MultiAction<Row> multiAction, int numAttempt, ServerName server,
+          Set<MultiServerCallable<Row>> callsInProgress) {
         this.multiAction = multiAction;
         this.numAttempt = numAttempt;
         this.server = server;
+        this.callsInProgress = callsInProgress;
       }
 
       @Override
       public void run() {
         MultiResponse res;
+        MultiServerCallable<Row> callable = null;
         try {
-          MultiServerCallable<Row> callable = createCallable(server, tableName, multiAction);
+          callable = createCallable(server, tableName, multiAction);
           try {
-            res = createCaller(callable).callWithoutRetries(callable, timeout);
+            RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
+            if (callsInProgress != null) callsInProgress.add(callable);
+            res = caller.callWithoutRetries(callable, timeout);
+
+            if (res == null) {
+              // Cancelled
+              return;
+            }
+
           } catch (IOException e) {
             // The service itself failed . It may be an error coming from the communication
             //   layer, but, as well, a functional error raised by the server.
@@ -722,6 +735,9 @@ class AsyncProcess {
               throw new RuntimeException(t);
         } finally {
           decTaskCounters(multiAction.getRegions(), server);
+          if (callsInProgress != null && callable != null) {
+            callsInProgress.remove(callable);
+          }
         }
       }
     }
@@ -730,6 +746,7 @@ class AsyncProcess {
     private final BatchErrors errors;
     private final ConnectionManager.ServerErrorTracker errorsByServer;
     private final ExecutorService pool;
+    private final Set<MultiServerCallable<Row>> callsInProgress;
 
 
     private final TableName tableName;
@@ -814,10 +831,17 @@ class AsyncProcess {
       } else {
         this.replicaGetIndices = null;
       }
+      this.callsInProgress = !hasAnyReplicaGets ? null :
+          Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>,
Boolean>());
+
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
     }
 
+    public Set<MultiServerCallable<Row>> getCallsInProgress() {
+      return callsInProgress;
+    }
+
     /**
      * Group a list of actions per region servers, and send them.
      *
@@ -980,7 +1004,7 @@ class AsyncProcess {
       // no stats to manage, just do the standard action
       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
-            new SingleServerRequestRunnable(multiAction, numAttempt, server)));
+            new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
       }
 
       // group the actions by the amount of delay
@@ -1002,7 +1026,8 @@ class AsyncProcess {
       for (DelayingRunner runner : actions.values()) {
         String traceText = "AsyncProcess.sendMultiAction";
         Runnable runnable =
-            new SingleServerRequestRunnable(runner.getActions(), numAttempt, server);
+            new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
+                callsInProgress);
         // use a delay runner only if we need to sleep for some time
         if (runner.getSleepTime() > 0) {
           runner.setRunner(runnable);
@@ -1520,6 +1545,12 @@ class AsyncProcess {
         waitUntilDone(Long.MAX_VALUE);
       } catch (InterruptedException iex) {
         throw new InterruptedIOException(iex.getMessage());
+      } finally {
+        if (callsInProgress != null) {
+          for (MultiServerCallable<Row> clb : callsInProgress) {
+            clb.cancel();
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf5ad96f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 88e4e22..72ae829 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -50,21 +51,21 @@ import com.google.protobuf.ServiceException;
  * {@link RegionServerCallable} that goes against multiple regions.
  * @param <R>
  */
-class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
+class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> implements
Cancellable {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
-  private RpcControllerFactory rpcFactory;
+  private final PayloadCarryingRpcController controller;
 
   MultiServerCallable(final ClusterConnection connection, final TableName tableName,
       final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R>
multi) {
     super(connection, tableName, null);
-    this.rpcFactory = rpcFactory;
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
     // Using region info from parent HRegionLocation would be a mistake for this class; so
     // we will store the server here, and throw if someone tries to obtain location/regioninfo.
     this.location = new HRegionLocation(null, location);
     this.cellBlock = isCellBlock();
+    controller = rpcFactory.newController();
   }
 
   @Override
@@ -119,7 +120,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse>
{
 
     // Controller optionally carries cell data over the proxy/service boundary and also
     // optionally ferries cell response data back out again.
-    PayloadCarryingRpcController controller = rpcFactory.newController(cells);
+    if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
     controller.setPriority(getTableName());
     controller.setCallTimeout(callTimeout);
     ClientProtos.MultiResponse responseProto;
@@ -129,10 +130,19 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse>
{
     } catch (ServiceException e) {
       throw ProtobufUtil.getRemoteException(e);
     }
+    if (responseProto == null) return null; // Occurs on cancel
     return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
   }
 
+  @Override
+  public void cancel() {
+    controller.startCancel();
+  }
 
+  @Override
+  public boolean isCancelled() {
+    return controller.isCanceled();
+  }
 
   /**
    * @return True if we should send data in cellblocks.  This is an expensive call.  Cache
the

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf5ad96f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 7069a42..054c9b5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -307,9 +307,14 @@ public class AsyncRpcChannel {
     controller.notifyOnCancel(new RpcCallback<Object>() {
       @Override
       public void run(Object parameter) {
-        failCall(call, new IOException("Canceled connection"));
+        calls.remove(call.id);
       }
     });
+    if (controller.isCanceled()) {
+      // To finish if the call was cancelled before we set the notification (race condition)
+      call.cancel(true);
+      return call;
+    }
 
     calls.put(call.id, call);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/cf5ad96f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index bb2d4db..efc8db2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -20,10 +20,12 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -522,6 +526,71 @@ public class TestReplicasClient {
   }
 
   @Test
+  public void testCancelOfMultiGet() throws Exception {
+    openRegion(hriSecondary);
+    try {
+      List<Put> puts = new ArrayList<Put>(2);
+      byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
+      Put p = new Put(b1);
+      p.add(f, b1, b1);
+      puts.add(p);
+
+      byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
+      p = new Put(b2);
+      p.add(f, b2, b2);
+      puts.add(p);
+      table.put(puts);
+      LOG.debug("PUT done");
+      flushRegion(hriPrimary);
+      LOG.info("flush done");
+
+      Thread.sleep(1000 + REFRESH_PERIOD * 2);
+
+      AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
+          .getAsyncProcess();
+
+      // Make primary slowdown
+      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+
+      List<Get> gets = new ArrayList<Get>();
+      Get g = new Get(b1);
+      g.setCheckExistenceOnly(true);
+      g.setConsistency(Consistency.TIMELINE);
+      gets.add(g);
+      g = new Get(b2);
+      g.setCheckExistenceOnly(true);
+      g.setConsistency(Consistency.TIMELINE);
+      gets.add(g);
+      Object[] results = new Object[2];
+      AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
+          gets, null, results);
+      reqs.waitUntilDone();
+      // verify we got the right results back
+      for (Object r : results) {
+        Assert.assertTrue(((Result)r).isStale());
+        Assert.assertTrue(((Result)r).getExists());
+      }
+      Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
+      // verify we did cancel unneeded calls
+      Assert.assertTrue(!set.isEmpty());
+      for (MultiServerCallable<Row> m : set) {
+        Assert.assertTrue(m.isCancelled());
+      }
+    } finally {
+      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+      SlowMeCopro.slowDownNext.set(false);
+      SlowMeCopro.countOfNext.set(0);
+      for (int i = 0; i < 2; i++) {
+        byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
+        Delete d = new Delete(b1);
+        table.delete(d);
+      }
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
   public void testScanWithReplicas() throws Exception {
     //simple scan
     runMultipleScansOfOneType(false, false);


Mime
View raw message