hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [08/50] [abbrv] hbase git commit: HBASE-19680 BufferedMutatorImpl#mutate should wait the result from AP in order to throw the failed mutations
Date Thu, 22 Feb 2018 06:58:57 GMT
HBASE-19680 BufferedMutatorImpl#mutate should wait the result from AP in order to throw the
failed mutations


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dad90f6c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dad90f6c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dad90f6c

Branch: refs/heads/HBASE-19064
Commit: dad90f6cce5bc51e43e3778068d5e45bcb1c9de0
Parents: abf7de7
Author: Chia-Ping Tsai <chia7712@gmail.com>
Authored: Sat Feb 17 07:16:14 2018 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Sat Feb 17 07:33:58 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  54 +----
 .../hadoop/hbase/client/AsyncRequestFuture.java |   6 +-
 .../hbase/client/AsyncRequestFutureImpl.java    |   3 +-
 .../hbase/client/BufferedMutatorImpl.java       | 241 +++++++++----------
 .../hbase/client/ConnectionImplementation.java  |   2 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |   7 +-
 .../hadoop/hbase/client/HTableMultiplexer.java  |   2 +-
 .../apache/hadoop/hbase/client/RowAccess.java   |   4 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  90 +++----
 .../TestAsyncProcessWithRegionException.java    |   2 +-
 .../hbase/client/HConnectionTestingUtility.java |   2 +-
 11 files changed, 171 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 6c4118c..de7449b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -65,17 +65,12 @@ import org.apache.hadoop.hbase.util.Bytes;
  * The class manages internally the retries.
  * </p>
  * <p>
- * The class can be constructed in regular mode, or "global error" mode. In global error
mode,
- * AP tracks errors across all calls (each "future" also has global view of all errors).
That
- * mode is necessary for backward compat with HTable behavior, where multiple submissions
are
- * made and the errors can propagate using any put/flush call, from previous calls.
- * In "regular" mode, the errors are tracked inside the Future object that is returned.
+ * The errors are tracked inside the Future object that is returned.
  * The results are always tracked inside the Future object and can be retrieved when the
call
  * has finished. Partial results can also be retrieved if some part of multi-request failed.
  * </p>
  * <p>
- * This class is thread safe in regular mode; in global error code, submitting operations
and
- * retrieving errors from different threads may be not thread safe.
+ * This class is thread safe.
  * Internally, the class is thread safe enough to manage simultaneously new submission and
results
  * arising from older operations.
  * </p>
@@ -144,7 +139,6 @@ class AsyncProcess {
   final ClusterConnection connection;
   private final RpcRetryingCallerFactory rpcCallerFactory;
   final RpcControllerFactory rpcFactory;
-  final BatchErrors globalErrors;
 
   // Start configuration settings.
   final int startLogErrorsCnt;
@@ -168,14 +162,12 @@ class AsyncProcess {
   private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
   private final int periodToLog;
   AsyncProcess(ClusterConnection hc, Configuration conf,
-      RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
-      RpcControllerFactory rpcFactory) {
+      RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
     if (hc == null) {
       throw new IllegalArgumentException("ClusterConnection cannot be null.");
     }
 
     this.connection = hc;
-    this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
 
     this.id = COUNTER.incrementAndGet();
 
@@ -445,10 +437,10 @@ class AsyncProcess {
 
   private Consumer<Long> getLogger(TableName tableName, long max) {
     return (currentInProgress) -> {
-      LOG.info("#" + id + (max < 0 ? ", waiting for any free slot"
-      : ", waiting for some tasks to finish. Expected max="
-      + max) + ", tasksInProgress=" + currentInProgress +
-      " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
+      LOG.info("#" + id + (max < 0 ?
+          ", waiting for any free slot" :
+          ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress="
+          + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName));
     };
   }
 
@@ -460,38 +452,6 @@ class AsyncProcess {
   void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
     requestController.decTaskCounters(regions, sn);
   }
-  /**
-   * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
-   * @return Whether there were any errors in any request since the last time
-   *          {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was
created.
-   */
-  public boolean hasError() {
-    return globalErrors != null && globalErrors.hasErrors();
-  }
-
-  /**
-   * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
-   * Waits for all previous operations to finish, and returns errors and (optionally)
-   * failed operations themselves.
-   * @param failedRows an optional list into which the rows that failed since the last time
-   *        {@link #waitForAllPreviousOpsAndReset(List, TableName)} was called, or AP was
created, are saved.
-   * @param tableName name of the table
-   * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List,
TableName)}
-   *          was called, or AP was created.
-   */
-  public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
-      List<Row> failedRows, TableName tableName) throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0, tableName);
-    if (globalErrors == null || !globalErrors.hasErrors()) {
-      return null;
-    }
-    if (failedRows != null) {
-      failedRows.addAll(globalErrors.actions);
-    }
-    RetriesExhaustedWithDetailsException result = globalErrors.makeException(logBatchErrorDetails);
-    globalErrors.clear();
-    return result;
-  }
 
   /**
    * Create a caller. Isolated to be easily overridden in the tests.

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
index 90bd235..b91e094 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFuture.java
@@ -24,10 +24,8 @@ import java.io.InterruptedIOException;
 import java.util.List;
 
 /**
- * The context used to wait for results from one submit call.
- * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
- *    then errors and failed operations in this object will reflect global errors.
- * 2) If submit call is made with needResults false, results will not be saved.
+ * The context used to wait for results from one submit call. If submit call is made with
+ * needResults false, results will not be saved.
  * @since 2.0.0
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index ace74f9..a8b8ebf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -388,8 +388,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture
{
             new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
     this.asyncProcess = asyncProcess;
     this.errorsByServer = createServerErrorTracker();
-    this.errors = (asyncProcess.globalErrors != null)
-        ? asyncProcess.globalErrors : new BatchErrors();
+    this.errors = new BatchErrors();
     this.operationTimeout = task.getOperationTimeout();
     this.rpcTimeout = task.getRpcTimeout();
     this.currentCallable = task.getCallable();

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index b171fc4..9d24b4d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -16,8 +16,11 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -141,7 +144,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
       RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
     this(conn, params,
       // puts need to track errors globally due to how the APIs currently work.
-      new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory));
+      new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
+  }
+
+  private void checkClose() {
+    if (closed) {
+      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
+    }
   }
 
   @VisibleForTesting
@@ -173,16 +182,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
   @Override
   public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
       RetriesExhaustedWithDetailsException {
-
-    if (closed) {
-      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
-    }
+    checkClose();
 
     long toAddSize = 0;
     int toAddCount = 0;
     for (Mutation m : ms) {
       if (m instanceof Put) {
-        validatePut((Put) m);
+        HTable.validatePut((Put) m, maxKeyValueSize);
       }
       toAddSize += m.heapSize();
       ++toAddCount;
@@ -191,26 +197,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
     if (currentWriteBufferSize.get() == 0) {
       firstRecordInBufferTimestamp.set(System.currentTimeMillis());
     }
-
-    // This behavior is highly non-intuitive... it does not protect us against
-    // 94-incompatible behavior, which is a timing issue because hasError, the below code
-    // and setter of hasError are not synchronized. Perhaps it should be removed.
-    if (ap.hasError()) {
-      currentWriteBufferSize.addAndGet(toAddSize);
-      writeAsyncBuffer.addAll(ms);
-      undealtMutationCount.addAndGet(toAddCount);
-      backgroundFlushCommits(true);
-    } else {
-      currentWriteBufferSize.addAndGet(toAddSize);
-      writeAsyncBuffer.addAll(ms);
-      undealtMutationCount.addAndGet(toAddCount);
-    }
-
-    // Now try and queue what needs to be queued.
-    while (undealtMutationCount.get() != 0
-        && currentWriteBufferSize.get() > writeBufferSize) {
-      backgroundFlushCommits(false);
-    }
+    currentWriteBufferSize.addAndGet(toAddSize);
+    writeAsyncBuffer.addAll(ms);
+    undealtMutationCount.addAndGet(toAddCount);
+    doFlush(false);
   }
 
   @VisibleForTesting
@@ -238,118 +228,40 @@ public class BufferedMutatorImpl implements BufferedMutator {
     }
   }
 
-  // validate for well-formedness
-  public void validatePut(final Put put) throws IllegalArgumentException {
-    HTable.validatePut(put, maxKeyValueSize);
-  }
-
   @Override
   public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    // Stop any running Periodic Flush timer.
+    disableWriteBufferPeriodicFlush();
     try {
-      if (this.closed) {
-        return;
-      }
-
-      // Stop any running Periodic Flush timer.
-      disableWriteBufferPeriodicFlush();
-
       // As we can have an operation in progress even if the buffer is empty, we call
-      // backgroundFlushCommits at least one time.
-      backgroundFlushCommits(true);
+      // doFlush at least one time.
+      doFlush(true);
+    } finally {
       if (cleanupPoolOnClose) {
         this.pool.shutdown();
-        boolean terminated;
-        int loopCnt = 0;
-        do {
-          // wait until the pool has terminated
-          terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
-          loopCnt += 1;
-          if (loopCnt >= 10) {
+        try {
+          if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {
             LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
-            break;
           }
-        } while (!terminated);
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("waitForTermination interrupted");
-    } finally {
-      this.closed = true;
-    }
-  }
-
-  @Override
-  public synchronized void flush() throws InterruptedIOException,
-      RetriesExhaustedWithDetailsException {
-    // As we can have an operation in progress even if the buffer is empty, we call
-    // backgroundFlushCommits at least one time.
-    backgroundFlushCommits(true);
-  }
-
-  /**
-   * Send the operations in the buffer to the servers. Does not wait for the server's answer.
If
-   * the is an error (max retried reach from a previous flush or bad operation), it tries
to send
-   * all operations in the buffer and sends an exception.
-   *
-   * @param synchronous - if true, sends all the writes and wait for all of them to finish
before
-   *        returning.
-   */
-  private void backgroundFlushCommits(boolean synchronous) throws
-      InterruptedIOException,
-      RetriesExhaustedWithDetailsException {
-    if (!synchronous && writeAsyncBuffer.isEmpty()) {
-      return;
-    }
-
-    if (!synchronous) {
-      QueueRowAccess taker = new QueueRowAccess();
-      AsyncProcessTask task = wrapAsyncProcessTask(taker);
-      try {
-        ap.submit(task);
-        if (ap.hasError()) {
-          LOG.debug(tableName + ": One or more of the operations have failed -"
-              + " waiting for all operation in progress to finish (successfully or not)");
-        }
-      } finally {
-        taker.restoreRemainder();
-      }
-    }
-    if (synchronous || ap.hasError()) {
-      QueueRowAccess taker = new QueueRowAccess();
-      AsyncProcessTask task = wrapAsyncProcessTask(taker);
-      try {
-        while (!taker.isEmpty()) {
-          ap.submit(task);
-          taker.reset();
-        }
-      } finally {
-        taker.restoreRemainder();
-      }
-      RetriesExhaustedWithDetailsException error =
-          ap.waitForAllPreviousOpsAndReset(null, tableName);
-      if (error != null) {
-        if (listener == null) {
-          throw error;
-        } else {
-          this.listener.onException(error, this);
+        } catch (InterruptedException e) {
+          LOG.warn("waitForTermination interrupted");
+          Thread.currentThread().interrupt();
         }
       }
+      closed = true;
     }
   }
 
-  /**
-   * Reuse the AsyncProcessTask when calling
-   * {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
-   * @param taker access the inner buffer.
-   * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
-   */
-  private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
-    AsyncProcessTask task = AsyncProcessTask.newBuilder()
+  private AsyncProcessTask createTask(QueueRowAccess access) {
+    return new AsyncProcessTask(AsyncProcessTask.newBuilder()
         .setPool(pool)
         .setTableName(tableName)
-        .setRowAccess(taker)
+        .setRowAccess(access)
         .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
-        .build();
-    return new AsyncProcessTask(task) {
+        .build()) {
       @Override
       public int getRpcTimeout() {
         return rpcTimeout.get();
@@ -362,6 +274,72 @@ public class BufferedMutatorImpl implements BufferedMutator {
     };
   }
 
+  @Override
+  public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException
{
+    checkClose();
+    doFlush(true);
+  }
+
+  /**
+   * Send the operations in the buffer to the servers.
+   *
+   * @param flushAll - if true, sends all the writes and wait for all of them to finish before
+   *                 returning. Otherwise, flush until buffer size is smaller than threshold
+   */
+  private void doFlush(boolean flushAll) throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
+    List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
+    while (true) {
+      if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
+        // There is the room to accept more mutations.
+        break;
+      }
+      AsyncRequestFuture asf;
+      try (QueueRowAccess access = new QueueRowAccess()) {
+        if (access.isEmpty()) {
+          // It means someone has gotten the ticker to run the flush.
+          break;
+        }
+        asf = ap.submit(createTask(access));
+      }
+      // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
+      // be released.
+      asf.waitUntilDone();
+      if (asf.hasError()) {
+        errors.add(asf.getErrors());
+      }
+    }
+
+    RetriesExhaustedWithDetailsException exception = makeException(errors);
+    if (exception == null) {
+      return;
+    } else if(listener == null) {
+      throw exception;
+    } else {
+      listener.onException(exception, this);
+    }
+  }
+
+  private static RetriesExhaustedWithDetailsException makeException(
+    List<RetriesExhaustedWithDetailsException> errors) {
+    switch (errors.size()) {
+      case 0:
+        return null;
+      case 1:
+        return errors.get(0);
+      default:
+        List<Throwable> exceptions = new ArrayList<>();
+        List<Row> actions = new ArrayList<>();
+        List<String> hostnameAndPort = new ArrayList<>();
+        errors.forEach(e -> {
+          exceptions.addAll(e.exceptions);
+          actions.addAll(e.actions);
+          hostnameAndPort.addAll(e.hostnameAndPort);
+        });
+        return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort);
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -433,12 +411,15 @@ public class BufferedMutatorImpl implements BufferedMutator {
     return undealtMutationCount.get();
   }
 
-  private class QueueRowAccess implements RowAccess<Row> {
+  private class QueueRowAccess implements RowAccess<Row>, Closeable {
     private int remainder = undealtMutationCount.getAndSet(0);
 
-    void reset() {
-      restoreRemainder();
-      remainder = undealtMutationCount.getAndSet(0);
+    @Override
+    public void close() {
+      if (remainder > 0) {
+        undealtMutationCount.addAndGet(remainder);
+        remainder = 0;
+      }
     }
 
     @Override
@@ -474,6 +455,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
           iter.remove();
           currentWriteBufferSize.addAndGet(-last.heapSize());
           --remainder;
+          last = null;
         }
       };
     }
@@ -483,13 +465,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
       return remainder;
     }
 
-    void restoreRemainder() {
-      if (remainder > 0) {
-        undealtMutationCount.addAndGet(remainder);
-        remainder = 0;
-      }
-    }
-
     @Override
     public boolean isEmpty() {
       return remainder <= 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8807884..a5a0188 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -261,7 +261,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable
{
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
     this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
-    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory);
+    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
     if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
       this.metrics = new MetricsConnection(this);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 855005e..a4289e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1182,10 +1182,9 @@ public class HTable implements Table {
     final List<String> callbackErrorServers = new ArrayList<>();
     Object[] results = new Object[execs.size()];
 
-    AsyncProcess asyncProcess =
-        new AsyncProcess(connection, configuration,
-            RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
-            true, RpcControllerFactory.instantiate(configuration));
+    AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
+        RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
+        RpcControllerFactory.instantiate(configuration));
 
     Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
     = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) ->
{

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 4be39a9..e6b061e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -452,7 +452,7 @@ public class HTableMultiplexer {
               HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
       this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
           HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
+      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
       this.executor = executor;
       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
       this.pool = pool;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
index 16921fe..9f92c66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -27,7 +25,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * of elements between collections.
  * @param <T>
  */
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 public interface RowAccess<T> extends Iterable<T> {
   /**
    * @return true if there are no elements.

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index fc405df..2979dcd 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
@@ -171,22 +170,17 @@ public class TestAsyncProcess {
     }
 
     public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
-      this(hc, conf, new AtomicInteger());
+      super(hc, conf,
+          new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
+      service = Executors.newFixedThreadPool(5);
     }
 
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads)
{
-      super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
       service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
           new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
     }
 
-    public MyAsyncProcess(
-        ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
-      super(hc, conf,
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
-      service = Executors.newFixedThreadPool(5);
-    }
-
     public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
         List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult>
callback,
         boolean needResults) throws InterruptedIOException {
@@ -324,7 +318,7 @@ public class TestAsyncProcess {
     private final IOException ioe;
 
     public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException
ioe) {
-      super(hc, conf, true);
+      super(hc, conf);
       this.ioe = ioe;
       serverTrackerTimeout = 1L;
     }
@@ -656,7 +650,7 @@ public class TestAsyncProcess {
         + ", minCountSnRequest:" + minCountSnRequest
         + ", minCountSn2Request:" + minCountSn2Request);
 
-    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
     try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
       mutator.mutate(puts);
@@ -807,7 +801,7 @@ public class TestAsyncProcess {
 
   @Test
   public void testFail() throws Exception {
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
 
     List<Put> puts = new ArrayList<>(1);
     Put p = createPut(1, false);
@@ -834,7 +828,7 @@ public class TestAsyncProcess {
   @Test
   public void testSubmitTrue() throws IOException {
     ClusterConnection conn = createHConnection();
-    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     final String defaultClazz =
         conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
     conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@@ -882,7 +876,7 @@ public class TestAsyncProcess {
 
   @Test
   public void testFailAndSuccess() throws Exception {
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
 
     List<Put> puts = new ArrayList<>(3);
     puts.add(createPut(1, false));
@@ -909,7 +903,7 @@ public class TestAsyncProcess {
 
   @Test
   public void testFlush() throws Exception {
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
 
     List<Put> puts = new ArrayList<>(3);
     puts.add(createPut(1, false));
@@ -927,7 +921,7 @@ public class TestAsyncProcess {
   @Test
   public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException
{
     ClusterConnection hc = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false);
+    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
     testTaskCount(ap);
   }
 
@@ -944,7 +938,7 @@ public class TestAsyncProcess {
         conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
     conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
       SimpleRequestController.class.getName());
-    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
     testTaskCount(ap);
     if (defaultClazz != null) {
       conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
@@ -981,7 +975,7 @@ public class TestAsyncProcess {
         conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
     conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
       SimpleRequestController.class.getName());
-    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     SimpleRequestController controller = (SimpleRequestController) ap.requestController;
 
 
@@ -1087,7 +1081,7 @@ public class TestAsyncProcess {
   @Test
   public void testHTablePutSuccess() throws Exception {
     ClusterConnection conn = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
     BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
 
@@ -1104,7 +1098,7 @@ public class TestAsyncProcess {
   @Test
   public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
     ClusterConnection conn = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
 
     checkPeriodicFlushParameters(conn, ap,
             1234, 1234,
@@ -1150,7 +1144,7 @@ public class TestAsyncProcess {
   @Test
   public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
     ClusterConnection conn = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
 
     bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1);     // Flush ASAP
@@ -1217,7 +1211,7 @@ public class TestAsyncProcess {
   @Test
   public void testBufferedMutatorImplWithSharedPool() throws Exception {
     ClusterConnection conn = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
     BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
 
@@ -1226,30 +1220,27 @@ public class TestAsyncProcess {
   }
 
   @Test
-  public void testHTableFailedPutAndNewPut() throws Exception {
+  public void testFailedPutAndNewPut() throws Exception {
     ClusterConnection conn = createHConnection();
-    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
             .writeBufferSize(0);
     BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
 
     Put p = createPut(1, false);
-    mutator.mutate(p);
-
-    ap.waitForMaximumCurrentTasks(0, null); // Let's do all the retries.
-
-    // We're testing that we're behaving as we were behaving in 0.94: sending exceptions
in the
-    //  doPut if it fails.
-    // This said, it's not a very easy going behavior. For example, when we insert a list
of
-    //  puts, we may raise an exception in the middle of the list. It's then up to the caller
to
-    //  manage what was inserted, what was tried but failed, and what was not even tried.
-    p = createPut(1, true);
-    Assert.assertEquals(0, mutator.size());
     try {
       mutator.mutate(p);
       Assert.fail();
-    } catch (RetriesExhaustedException expected) {
+    } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getRow(0) == p);
     }
+    // Let's do all the retries.
+    ap.waitForMaximumCurrentTasks(0, null);
+    Assert.assertEquals(0, mutator.size());
+
+    // There is no global error so the new put should not fail
+    mutator.mutate(createPut(1, true));
     Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
   }
 
@@ -1277,7 +1268,7 @@ public class TestAsyncProcess {
   public void testBatch() throws IOException, InterruptedException {
     ClusterConnection conn = new MyConnectionImpl(CONF);
     HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
-    ht.multiAp = new MyAsyncProcess(conn, CONF, false);
+    ht.multiAp = new MyAsyncProcess(conn, CONF);
 
     List<Put> puts = new ArrayList<>(7);
     puts.add(createPut(1, true));
@@ -1307,7 +1298,7 @@ public class TestAsyncProcess {
   public void testErrorsServers() throws IOException {
     Configuration configuration = new Configuration(CONF);
     ClusterConnection conn = new MyConnectionImpl(configuration);
-    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
     BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
     configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
@@ -1323,21 +1314,22 @@ public class TestAsyncProcess {
       mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getRow(0) == p);
     }
     // Checking that the ErrorsServers came into play and didn't make us stop immediately
     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
   }
 
-  @Ignore @Test // Test is failing with wrong count. FIX!!
+  @Test
   public void testReadAndWriteTimeout() throws IOException {
     final long readTimeout = 10 * 1000;
     final long writeTimeout = 20 * 1000;
     Configuration copyConf = new Configuration(CONF);
     copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
     copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
-    ClusterConnection conn = createHConnection();
-    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
-    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+    ClusterConnection conn = new MyConnectionImpl(copyConf);
+    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
     try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
       ht.multiAp = ap;
       List<Get> gets = new LinkedList<>();
@@ -1368,7 +1360,7 @@ public class TestAsyncProcess {
   }
 
   @Test
-  public void testGlobalErrors() throws IOException {
+  public void testErrors() throws IOException {
     ClusterConnection conn = new MyConnectionImpl(CONF);
     AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
     BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
@@ -1383,6 +1375,8 @@ public class TestAsyncProcess {
       mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getRow(0) == p);
     }
     // Checking that the ErrorsServers came into play and didn't make us stop immediately
     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@@ -1404,6 +1398,8 @@ public class TestAsyncProcess {
       mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getRow(0) == p);
     }
     // Checking that the ErrorsServers came into play and didn't make us stop immediately
     Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
@@ -1703,7 +1699,7 @@ public class TestAsyncProcess {
 
   static class AsyncProcessForThrowableCheck extends AsyncProcess {
     public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
-      super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
+      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
           conf));
     }
   }
@@ -1759,6 +1755,8 @@ public class TestAsyncProcess {
       mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getRow(0) == p);
     }
     long actualSleep = System.currentTimeMillis() - startTime;
     long expectedSleep = 0L;
@@ -1784,6 +1782,8 @@ public class TestAsyncProcess {
       mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
+      assertEquals(1, expected.getNumExceptions());
+      assertTrue(expected.getRow(0) == p);
     }
     actualSleep = System.currentTimeMillis() - startTime;
     expectedSleep = 0L;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
index c46385e..ffc4e51 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
@@ -202,7 +202,7 @@ public class TestAsyncProcessWithRegionException {
     private final ExecutorService service = Executors.newFixedThreadPool(5);
 
     MyAsyncProcess(ClusterConnection hc, Configuration conf) {
-      super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
     }
 
     public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)

http://git-wip-us.apache.org/repos/asf/hbase/blob/dad90f6c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 8ef784c..0f896b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -127,7 +127,7 @@ public class HConnectionTestingUtility {
     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
     Mockito.when(c.getNonceGenerator()).thenReturn(ng);
     Mockito.when(c.getAsyncProcess()).thenReturn(
-      new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), false,
+      new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf),
           RpcControllerFactory.instantiate(conf)));
     Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
         RpcRetryingCallerFactory.instantiate(conf,


Mime
View raw message