Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BB710200D16 for ; Tue, 10 Oct 2017 09:00:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BA09A160BE0; Tue, 10 Oct 2017 07:00:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6BD3B1609E5 for ; Tue, 10 Oct 2017 09:00:17 +0200 (CEST) Received: (qmail 48976 invoked by uid 500); 10 Oct 2017 07:00:16 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 48967 invoked by uid 99); 10 Oct 2017 07:00:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Oct 2017 07:00:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A81FF5AD6; Tue, 10 Oct 2017 07:00:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-18951 Use Builder pattern to remove nullable parameters for checkAndXXX methods in RawAsyncTable/AsyncTable interface Date: Tue, 10 Oct 2017 07:00:16 +0000 (UTC) archived-at: Tue, 10 Oct 2017 07:00:18 -0000 Repository: hbase Updated Branches: refs/heads/master c3b3fd788 -> 8597b19b3 HBASE-18951 Use Builder pattern to remove nullable parameters for checkAndXXX methods in RawAsyncTable/AsyncTable interface Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8597b19b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8597b19b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8597b19b Branch: refs/heads/master Commit: 8597b19b3d7bb2671a6afd7e902aaaea6690a105 Parents: c3b3fd7 Author: zhangduo Authored: Tue Oct 10 11:11:19 2017 +0800 Committer: zhangduo Committed: Tue Oct 10 14:41:23 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncTableBase.java | 158 ++++++++----------- .../hadoop/hbase/client/AsyncTableImpl.java | 57 +++++-- .../hadoop/hbase/client/RawAsyncTableImpl.java | 123 ++++++++++----- .../hadoop/hbase/client/TestAsyncTable.java | 53 ++++--- 4 files changed, 219 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8597b19b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index ed4c497..cc9f337 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -191,72 +191,75 @@ public interface AsyncTableBase { } /** - * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it - * adds the put. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param put data to put if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - default CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) { - return checkAndPut(row, family, qualifier, CompareOperator.EQUAL, value, put); - } - - /** * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * adds the put. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp comparison operator to use - * @param value the expected value - * @param put data to put if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOperator compareOp, byte[] value, Put put); - - /** - * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it - * adds the delete. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param delete data to delete if check succeeds - * @return true if the new delete was executed, false otherwise. The return value will be wrapped - * by a {@link CompletableFuture}. + * adds the Put/Delete/RowMutations. + *

+ * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. + * This is a fluent style API, the code is like: + * + *

+   * 
+   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
+   *     .thenAccept(succ -> {
+   *       if (succ) {
+   *         System.out.println("Check and put succeeded");
+   *       } else {
+   *         System.out.println("Check and put failed");
+   *       }
+   *     });
+   * 
+   * 
*/ - default CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) { - return checkAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, delete); + CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); + + /** + * A helper class for sending checkAndMutate request. + */ + interface CheckAndMutateBuilder { + + /** + * @param qualifier column qualifier to check. + */ + CheckAndMutateBuilder qualifier(byte[] qualifier); + + /** + * Check for lack of column. + */ + CheckAndMutateBuilder ifNotExists(); + + default CheckAndMutateBuilder ifEquals(byte[] value) { + return ifMatches(CompareOperator.EQUAL, value); + } + + /** + * @param compareOp comparison operator to use + * @param value the expected value + */ + CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); + + /** + * @param put data to put if check succeeds + * @return {@code true} if the new put was executed, {@code false} otherwise. The return value + * will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture thenPut(Put put); + + /** + * @param delete data to delete if check succeeds + * @return {@code true} if the new delete was executed, {@code false} otherwise. The return + * value will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture thenDelete(Delete delete); + + /** + * @param mutation mutations to perform if check succeeds + * @return true if the new mutation was executed, false otherwise. The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture thenMutate(RowMutations mutation); } /** - * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * adds the delete. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp comparison operator to use - * @param value the expected value - * @param delete data to delete if check succeeds - * @return true if the new delete was executed, false otherwise. The return value will be wrapped - * by a {@link CompletableFuture}. - */ - CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOperator compareOp, byte[] value, Delete delete); - - /** * Performs multiple mutations atomically on a single row. Currently {@link Put} and * {@link Delete} are supported. * @param mutation object that specifies the set of mutations to perform atomically @@ -265,39 +268,6 @@ public interface AsyncTableBase { CompletableFuture mutateRow(RowMutations mutation); /** - * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it - * performs the row mutations. If the passed value is null, the check is for the lack of column - * (ie: non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param mutation mutations to perform if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - default CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - byte[] value, RowMutations mutation) { - return checkAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, mutation); - } - - /** - * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * performs the row mutations. If the passed value is null, the check is for the lack of column - * (ie: non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp the comparison operator - * @param value the expected value - * @param mutation mutations to perform if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOperator compareOp, byte[] value, RowMutations mutation); - - /** * Return all the results that match the given scan object. *

* Notice that usually you should use this method with a {@link Scan} object that has limit set. http://git-wip-us.apache.org/repos/asf/hbase/blob/8597b19b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 2b7a64e..ae43f5b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -17,18 +17,19 @@ */ package org.apache.hadoop.hbase.client; +import static java.util.stream.Collectors.toList; + import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import static java.util.stream.Collectors.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; /** * The implementation of AsyncTable. Based on {@link RawAsyncTable}. @@ -121,15 +122,44 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Put put) { - return wrap(rawTable.checkAndPut(row, family, qualifier, op, value, put)); - } + public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + return new CheckAndMutateBuilder() { - @Override - public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Delete delete) { - return wrap(rawTable.checkAndDelete(row, family, qualifier, op, value, delete)); + private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); + + @Override + public CompletableFuture thenPut(Put put) { + return wrap(builder.thenPut(put)); + } + + @Override + public CompletableFuture thenMutate(RowMutations mutation) { + return wrap(builder.thenMutate(mutation)); + } + + @Override + public CompletableFuture thenDelete(Delete delete) { + return wrap(builder.thenDelete(delete)); + } + + @Override + public CheckAndMutateBuilder qualifier(byte[] qualifier) { + builder.qualifier(qualifier); + return this; + } + + @Override + public CheckAndMutateBuilder ifNotExists() { + builder.ifNotExists(); + return this; + } + + @Override + public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { + builder.ifMatches(compareOp, value); + return this; + } + }; } @Override @@ -138,12 +168,6 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, RowMutations mutation) { - return wrap(rawTable.checkAndMutate(row, family, qualifier, op, value, mutation)); - } - - @Override public CompletableFuture> scanAll(Scan scan) { return wrap(rawTable.scanAll(scan)); } @@ -198,5 +222,4 @@ class AsyncTableImpl implements AsyncTable { public List> batch(List actions) { return rawTable. batch(actions).stream().map(this::wrap).collect(toList()); } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/8597b19b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 722ee26..6107f7f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import com.google.protobuf.RpcChannel; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -59,8 +62,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; -import com.google.protobuf.RpcChannel; - /** * The implementation of RawAsyncTable. */ @@ -134,7 +135,7 @@ class RawAsyncTableImpl implements RawAsyncTable { Converter respConverter) { CompletableFuture future = new CompletableFuture<>(); try { - rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), + rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), new RpcCallback() { @Override @@ -251,28 +252,89 @@ class RawAsyncTableImpl implements RawAsyncTable { .call(); } - @Override - public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Put put) { - return this. newCaller(row, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), p), - (c, r) -> r.getProcessed())) - .call(); + private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { + + private final byte[] row; + + private final byte[] family; + + private byte[] qualifier; + + private CompareOperator op; + + private byte[] value; + + public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { + this.row = Preconditions.checkNotNull(row, "row is null"); + this.family = Preconditions.checkNotNull(family, "family is null"); + } + + @Override + public CheckAndMutateBuilder qualifier(byte[] qualifier) { + this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + + " an empty byte array, or just do not call this method if you want a null qualifier"); + return this; + } + + @Override + public CheckAndMutateBuilder ifNotExists() { + this.op = CompareOperator.EQUAL; + this.value = null; + return this; + } + + @Override + public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { + this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); + this.value = Preconditions.checkNotNull(value, "value is null"); + return this; + } + + private void preCheck() { + Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + + " calling ifNotExists/ifEquals/ifMatches before executing the request"); + } + + @Override + public CompletableFuture thenPut(Put put) { + preCheck(); + return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, + loc, stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), p), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture thenDelete(Delete delete) { + preCheck(); + return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, + loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), d), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture thenMutate(RowMutations mutation) { + preCheck(); + return RawAsyncTableImpl.this. newCaller(mutation, rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, + stub, mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), rm), + resp -> resp.getExists())) + .call(); + } } @Override - public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Delete delete) { - return this. newCaller(row, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), d), - (c, r) -> r.getProcessed())) - .call(); + public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + return new CheckAndMutateBuilderImpl(row, family); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, @@ -283,7 +345,7 @@ class RawAsyncTableImpl implements RawAsyncTable { Function respConverter) { CompletableFuture future = new CompletableFuture<>(); try { - byte[] regionName = loc.getRegionInfo().getRegionName(); + byte[] regionName = loc.getRegion().getRegionName(); MultiRequest req = reqConvert.convert(regionName, mutation); stub.multi(controller, req, new RpcCallback() { @@ -328,18 +390,6 @@ class RawAsyncTableImpl implements RawAsyncTable { }, resp -> null)).call(); } - @Override - public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, RowMutations mutation) { - return this. newCaller(mutation, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, - stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), rm), - resp -> resp.getExists())) - .call(); - } - private Scan setDefaultScanConfig(Scan scan) { // always create a new scan object as we may reset the start row later. Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); @@ -488,7 +538,7 @@ class RawAsyncTableImpl implements RawAsyncTable { return; } unfinishedRequest.incrementAndGet(); - RegionInfo region = loc.getRegionInfo(); + RegionInfo region = loc.getRegion(); if (locateFinished(region, endKey, endKeyInclusive)) { locateFinished.set(true); } else { @@ -524,4 +574,5 @@ class RawAsyncTableImpl implements RawAsyncTable { (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/8597b19b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 2fea0eb..5d8b116 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -221,14 +221,15 @@ public class TestAsyncTable { AtomicInteger successIndex = new AtomicInteger(-1); int count = 10; CountDownLatch latch = new CountDownLatch(count); - IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null, - new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - latch.countDown(); - })); + IntStream.range(0, count) + .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() + .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + latch.countDown(); + })); latch.await(); assertEquals(1, successCount.get()); String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); @@ -249,16 +250,17 @@ public class TestAsyncTable { AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); CountDownLatch deleteLatch = new CountDownLatch(count); - IntStream.range(0, count).forEach(i -> table - .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, - new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) - .thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - deleteLatch.countDown(); - })); + IntStream.range(0, count) + .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) + .thenDelete( + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); deleteLatch.await(); assertEquals(1, successCount.get()); Result result = table.get(new Get(row)).get(); @@ -311,13 +313,14 @@ public class TestAsyncTable { } catch (IOException e) { throw new UncheckedIOException(e); } - table.checkAndMutate(row, FAMILY, QUALIFIER, VALUE, mutation).thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - mutateLatch.countDown(); - }); + table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + mutateLatch.countDown(); + }); }); mutateLatch.await(); assertEquals(1, successCount.get());