hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject [3/3] hbase git commit: HBASE-12728 buffered writes substantially less useful after removal of HTablePool (Solomon Duskis and Nick Dimiduk)
Date Fri, 23 Jan 2015 17:14:01 GMT
HBASE-12728 buffered writes substantially less useful after removal of HTablePool (Solomon Duskis and Nick Dimiduk)

In our pre-1.0 API, HTable is considered a light-weight object that consumed by
a single thread at a time. The HTablePool class provided a means of sharing
multiple HTable instances across a number of threads. As an optimization,
HTable managed a "write buffer", accumulating edits and sending a "batch" all
at once. By default the batch was sent as the last step in invocations of
put(Put) and put(List<Put>). The user could disable the automatic flushing of
the write buffer, retaining edits locally and only sending the whole "batch"
once the write buffer has filled or when the flushCommits() method in invoked
explicitly. Explicit or implicit batch writing was controlled by the
setAutoFlushTo(boolean) method. A value of true (the default) had the write
buffer flushed at the completion of a call to put(Put) or put(List<Put>). A
value of false allowed for explicit buffer management. HTable also exposed the
buffer to consumers via getWriteBuffer().

The combination of HTable with setAutoFlushTo(false) and the HTablePool
provided a convenient mechanism by which multiple "Put-producing" threads could
share a common write buffer. Both HTablePool and HTable are deprecated, and
they are officially replaced in The new 1.0 API by Table and BufferedMutator.
Table, which replaces HTable, no longer exposes explicit write-buffer
management. Instead, explicit buffer management is exposed via BufferedMutator.
BufferedMutator is made safe for concurrent use. Where code would previously
retrieve and return HTables from an HTablePool, now that code creates and
shares a single BufferedMutator instance across all threads.

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
	hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
	hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
	hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
	hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
	hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
	hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
	hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7bbbaaeb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7bbbaaeb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7bbbaaeb

Branch: refs/heads/branch-1.0
Commit: 7bbbaaeb53295d3a50b9d863f3c01801df0a78b4
Parents: 8f75067
Author: Nick Dimiduk <ndimiduk@apache.org>
Authored: Fri Jan 23 09:13:24 2015 -0800
Committer: Nick Dimiduk <ndimiduk@apache.org>
Committed: Fri Jan 23 09:13:24 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/BufferedMutator.java    | 129 ++++++++++
 .../hbase/client/BufferedMutatorImpl.java       | 258 +++++++++++++++++++
 .../hbase/client/BufferedMutatorParams.java     | 110 ++++++++
 .../apache/hadoop/hbase/client/Connection.java  |  31 +++
 .../hadoop/hbase/client/ConnectionAdapter.java  |  11 +
 .../hadoop/hbase/client/ConnectionManager.java  |  24 +-
 .../org/apache/hadoop/hbase/client/HTable.java  | 141 ++++------
 .../hadoop/hbase/client/HTableInterface.java    |  64 ++++-
 .../org/apache/hadoop/hbase/client/Table.java   |  32 +--
 .../hadoop/hbase/client/TableConfiguration.java |  20 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  89 +++----
 .../hbase/client/TestClientNoCluster.java       |  49 ++--
 .../client/example/BufferedMutatorExample.java  | 119 +++++++++
 .../test/IntegrationTestBigLinkedList.java      |  25 +-
 ...egrationTestBigLinkedListWithVisibility.java |   9 +-
 .../test/IntegrationTestLoadAndVerify.java      |  44 ++--
 ...tionTestWithCellVisibilityLoadAndVerify.java |   4 +-
 .../trace/IntegrationTestSendTraceRequests.java |   8 +-
 .../apache/hadoop/hbase/rest/RowResource.java   |   3 -
 .../hbase/rest/PerformanceEvaluation.java       | 103 +++++---
 .../hbase/rest/client/TestRemoteTable.java      |  15 +-
 .../hadoop/hbase/client/HTableWrapper.java      |   8 +-
 .../hadoop/hbase/mapred/TableOutputFormat.java  |  19 +-
 .../hbase/mapreduce/MultiTableOutputFormat.java |  35 ++-
 .../hbase/mapreduce/TableOutputFormat.java      |  28 +-
 .../hadoop/hbase/PerformanceEvaluation.java     |  82 ++++--
 .../hadoop/hbase/client/TestClientPushback.java |   2 +-
 .../client/TestCloneSnapshotFromClient.java     |  37 ++-
 .../hadoop/hbase/client/TestFromClientSide.java |  32 +--
 .../hadoop/hbase/client/TestMultiParallel.java  |  32 +--
 .../client/TestRestoreSnapshotFromClient.java   |  16 +-
 .../hbase/client/TestRpcControllerFactory.java  |  10 +-
 .../hbase/coprocessor/TestHTableWrapper.java    |   6 +-
 .../master/TestDistributedLogSplitting.java     |  10 +-
 .../apache/hadoop/hbase/master/TestMaster.java  |  10 +-
 .../TestEndToEndSplitTransaction.java           |   6 +-
 .../hbase/regionserver/TestFSErrorsExposed.java |  32 ++-
 .../regionserver/TestRegionFavoredNodes.java    |   1 +
 .../regionserver/TestRegionServerMetrics.java   |  91 ++++---
 .../regionserver/TestScannerWithBulkload.java   |   8 +-
 .../hbase/regionserver/wal/TestLogRolling.java  |   6 +-
 ...estReplicationChangingPeerRegionservers.java |   7 +-
 .../replication/TestReplicationSmallTests.java  |   3 +-
 .../replication/TestReplicationWithTags.java    |   5 +-
 .../TestVisibilityLabelsReplication.java        |  29 +--
 .../hbase/snapshot/SnapshotTestingUtils.java    |  29 ++-
 .../snapshot/TestFlushSnapshotFromClient.java   |  16 +-
 .../TestRestoreFlushSnapshotFromClient.java     |   8 +-
 48 files changed, 1277 insertions(+), 579 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
new file mode 100644
index 0000000..3b91078
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * <p>Used to communicate with a single HBase table similar to {@link Table} but meant for
+ * batched, asynchronous puts. Obtain an instance from a {@link Connection} and call
+ * {@link #close()} afterwards. Customizations can be applied to the {@code BufferedMutator} via
+ * the {@link BufferedMutatorParams}.
+ * </p>
+ *
+ * <p>Exception handling with asynchronously via the {@link BufferedMutator.ExceptionListener}.
+ * The default implementation is to throw the exception upon receipt. This behavior can be
+ * overridden with a custom implementation, provided as a parameter with
+ * {@link BufferedMutatorParams#listener(BufferedMutator.ExceptionListener)}.</p>
+ *
+ * <p>Map/Reduce jobs are good use cases for using {@code BufferedMutator}. Map/reduce jobs
+ * benefit from batching, but have no natural flush point. {@code BufferedMutator} receives the
+ * puts from the M/R job and will batch puts based on some heuristic, such as the accumulated size
+ * of the puts, and submit batches of puts asynchronously so that the M/R logic can continue
+ * without interruption.
+ * </p>
+ *
+ * <p>{@code BufferedMutator} can also be used on more exotic circumstances. Map/Reduce batch jobs
+ * will have a single {@code BufferedMutator} per thread. A single {@code BufferedMutator} can
+ * also be effectively used in high volume online systems to batch puts, with the caveat that
+ * extreme circumstances, such as JVM or machine failure, may cause some data loss.</p>
+ *
+ * <p>NOTE: This class replaces the functionality that used to be available via
+ * {@link HTableInterface#setAutoFlush(boolean)} set to {@code false}.
+ * </p>
+ *
+ * <p>See also the {@code BufferedMutatorExample} in the hbase-examples module.</p>
+ * @see ConnectionFactory
+ * @see Connection
+ * @since 1.0.0
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface BufferedMutator extends Closeable {
+  /**
+   * Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
+   */
+  TableName getName();
+
+  /**
+   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will
+   * affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the
+   * wire as part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
+   *
+   * @param mutation The data to send.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  void mutate(Mutation mutation) throws IOException;
+
+  /**
+   * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the
+   * wire as part of a batch. There is no guarantee of sending entire content of {@code mutations}
+   * in a single batch; it will be broken up according to the write buffer capacity.
+   *
+   * @param mutations The data to send.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  void mutate(List<? extends Mutation> mutations) throws IOException;
+
+  /**
+   * Performs a {@link #flush()} and releases any resources held.
+   *
+   * @throws IOException if a remote or network exception occurs.
+   */
+  @Override
+  void close() throws IOException;
+
+  /**
+   * Executes all the buffered, asynchronous {@link Mutation} operations and waits until they
+   * are done.
+   *
+   * @throws IOException if a remote or network exception occurs.
+   */
+  void flush() throws IOException;
+
+  /**
+   * Returns the maximum size in bytes of the write buffer for this HTable.
+   * <p>
+   * The default value comes from the configuration parameter {@code hbase.client.write.buffer}.
+   * @return The size of the write buffer in bytes.
+   */
+  long getWriteBufferSize();
+
+  /**
+   * Listens for asynchronous exceptions on a {@link BufferedMutator}.
+   */
+  interface ExceptionListener {
+    public void onException(RetriesExhaustedWithDetailsException exception,
+        BufferedMutator mutator) throws RetriesExhaustedWithDetailsException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
new file mode 100644
index 0000000..54e7ccd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -0,0 +1,258 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Used to communicate with a single HBase table similar to {@link HTable}
+ * but meant for batched, potentially asynchronous puts. Obtain an instance from
+ * a {@link Connection} and call {@link #close()} afterwards.
+ * </p>
+ *
+ * @see ConnectionFactory
+ * @see Connection
+ * @since 1.0.0
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class BufferedMutatorImpl implements BufferedMutator {
+
+  private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
+  
+  private final ExceptionListener listener;
+
+  protected ClusterConnection connection; // non-final so can be overridden in test
+  private final TableName tableName;
+  private volatile Configuration conf;
+  private List<Row> writeAsyncBuffer = new LinkedList<>();
+  private long writeBufferSize;
+  private final int maxKeyValueSize;
+  protected long currentWriteBufferSize = 0;
+  private boolean closed = false;
+  private final ExecutorService pool;
+  protected AsyncProcess ap; // non-final so can be overridden in test
+
+  BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
+      RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+    if (conn == null || conn.isClosed()) {
+      throw new IllegalArgumentException("Connection is null or closed.");
+    }
+
+    this.tableName = params.getTableName();
+    this.connection = conn;
+    this.conf = connection.getConfiguration();
+    this.pool = params.getPool();
+    this.listener = params.getListener();
+
+    TableConfiguration tableConf = new TableConfiguration(conf);
+    this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
+        params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+    this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
+        params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
+
+    // puts need to track errors globally due to how the APIs currently work.
+    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
+  }
+
+  @Override
+  public TableName getName() {
+    return tableName;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public synchronized void mutate(Mutation m) throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
+    doMutate(m);
+  }
+
+  @Override
+  public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
+    for (Mutation m : ms) {
+      doMutate(m);
+    }
+  }
+
+  /**
+   * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
+   * cluster.
+   *
+   * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
+   * @throws InterruptedIOException if we were interrupted.
+   */
+  private void doMutate(Mutation m) throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
+    if (closed) {
+      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
+    }
+    if (!(m instanceof Put) && !(m instanceof Delete)) {
+      throw new IllegalArgumentException("Pass a Delete or a Put");
+    }
+
+    // This behavior is highly non-intuitive... it does not protect us against
+    // 94-incompatible behavior, which is a timing issue because hasError, the below code
+    // and setter of hasError are not synchronized. Perhaps it should be removed.
+    if (ap.hasError()) {
+      writeAsyncBuffer.add(m);
+      backgroundFlushCommits(true);
+    }
+
+    if (m instanceof Put) {
+      validatePut((Put) m);
+    }
+
+    currentWriteBufferSize += m.heapSize();
+    writeAsyncBuffer.add(m);
+
+    while (currentWriteBufferSize > writeBufferSize) {
+      backgroundFlushCommits(false);
+    }
+  }
+
+  // validate for well-formedness
+  public void validatePut(final Put put) throws IllegalArgumentException {
+    HTable.validatePut(put, maxKeyValueSize);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
+    try {
+      // As we can have an operation in progress even if the buffer is empty, we call
+      // backgroundFlushCommits at least one time.
+      backgroundFlushCommits(true);
+      this.pool.shutdown();
+      boolean terminated = false;
+      int loopCnt = 0;
+      do {
+        // wait until the pool has terminated
+        terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+        loopCnt += 1;
+        if (loopCnt >= 10) {
+          LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
+          break;
+        }
+      } while (!terminated);
+    } catch (InterruptedException e) {
+      LOG.warn("waitForTermination interrupted");
+    } finally {
+      this.closed = true;
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
+    // As we can have an operation in progress even if the buffer is empty, we call
+    // backgroundFlushCommits at least one time.
+    backgroundFlushCommits(true);
+  }
+
+  /**
+   * Send the operations in the buffer to the servers. Does not wait for the server's answer. If
+   * the is an error (max retried reach from a previous flush or bad operation), it tries to send
+   * all operations in the buffer and sends an exception.
+   *
+   * @param synchronous - if true, sends all the writes and wait for all of them to finish before
+   *        returning.
+   */
+  private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
+    try {
+      if (!synchronous) {
+        ap.submit(tableName, writeAsyncBuffer, true, null, false);
+        if (ap.hasError()) {
+          LOG.debug(tableName + ": One or more of the operations have failed -"
+              + " waiting for all operation in progress to finish (successfully or not)");
+        }
+      }
+      if (synchronous || ap.hasError()) {
+        while (!writeAsyncBuffer.isEmpty()) {
+          ap.submit(tableName, writeAsyncBuffer, true, null, false);
+        }
+        RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
+        if (error != null) {
+          if (listener == null) {
+            throw error;
+          } else {
+            this.listener.onException(error, this);
+          }
+        }
+      }
+    } finally {
+      currentWriteBufferSize = 0;
+      for (Row mut : writeAsyncBuffer) {
+        if (mut instanceof Mutation) {
+          currentWriteBufferSize += ((Mutation) mut).heapSize();
+        }
+      }
+    }
+  }
+
+  /**
+   * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
+   * not be called for production uses.
+   * @deprecated Going away when we drop public support for {@link HTableInterface}.
+   */
+  @Deprecated
+  public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
+      InterruptedIOException {
+    this.writeBufferSize = writeBufferSize;
+    if (currentWriteBufferSize > writeBufferSize) {
+      flush();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long getWriteBufferSize() {
+    return this.writeBufferSize;
+  }
+
+  /**
+   * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ
+   * called from production uses.
+   * @deprecated Going away when we drop public support for {@link HTableInterface}.
+Ó   */
+  @Deprecated
+  public List<Row> getWriteBuffer() {
+    return this.writeAsyncBuffer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
new file mode 100644
index 0000000..d4cdead
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Parameters for instantiating a {@link BufferedMutator}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class BufferedMutatorParams {
+
+  static final int UNSET = -1;
+
+  private final TableName tableName;
+  private long writeBufferSize = UNSET;
+  private int maxKeyValueSize = UNSET;
+  private ExecutorService pool = null;
+  private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
+    @Override
+    public void onException(RetriesExhaustedWithDetailsException exception,
+        BufferedMutator bufferedMutator)
+        throws RetriesExhaustedWithDetailsException {
+      throw exception;
+    }
+  };
+
+  public BufferedMutatorParams(TableName tableName) {
+    this.tableName = tableName;
+  }
+
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  /**
+   * Override the write buffer size specified by the provided {@link Connection}'s
+   * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
+   * {@code hbase.client.write.buffer}.
+   */
+  public BufferedMutatorParams writeBufferSize(long writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  public int getMaxKeyValueSize() {
+    return maxKeyValueSize;
+  }
+
+  /**
+   * Override the maximum key-value size specified by the provided {@link Connection}'s
+   * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
+   * {@code hbase.client.keyvalue.maxsize}.
+   */
+  public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) {
+    this.maxKeyValueSize = maxKeyValueSize;
+    return this;
+  }
+
+  public ExecutorService getPool() {
+    return pool;
+  }
+
+  /**
+   * Override the default executor pool defined by the {@code hbase.htable.threads.*}
+   * configuration values.
+   */
+  public BufferedMutatorParams pool(ExecutorService pool) {
+    this.pool = pool;
+    return this;
+  }
+
+  public BufferedMutator.ExceptionListener getListener() {
+    return listener;
+  }
+
+  /**
+   * Override the default error handler. Default handler simply rethrows the exception.
+   */
+  public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) {
+    this.listener = listener;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index 92b3f04..2791d61 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -99,6 +99,37 @@ public interface Connection extends Abortable, Closeable {
   Table getTable(TableName tableName, ExecutorService pool)  throws IOException;
 
   /**
+   * <p>
+   * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
+   * {@link BufferedMutator} returned by this method is thread-safe. This BufferedMutator will
+   * use the Connection's ExecutorService. This object can be used for long lived operations.
+   * </p>
+   * <p>
+   * The caller is responsible for calling {@link BufferedMutator#close()} on
+   * the returned {@link BufferedMutator} instance.
+   * </p>
+   * <p>
+   * This accessor will use the connection's ExecutorService and will throw an
+   * exception in the main thread when an asynchronous exception occurs.
+   *
+   * @param tableName the name of the table
+   *
+   * @return a {@link BufferedMutator} for the supplied tableName.
+   */
+  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
+
+  /**
+   * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
+   * {@link BufferedMutator} returned by this method is thread-safe. This object can be used for
+   * long lived table operations. The caller is responsible for calling
+   * {@link BufferedMutator#close()} on the returned {@link BufferedMutator} instance.
+   *
+   * @param params details on how to instantiate the {@code BufferedMutator}.
+   * @return a {@link BufferedMutator} for the supplied tableName.
+   */
+  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException;
+
+  /**
    * Retrieve a RegionLocator implementation to inspect region information on a table. The returned
    * RegionLocator is not thread-safe, so a new instance should be created for each using thread.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index a1b71f4..eb9842f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -111,6 +111,17 @@ abstract class ConnectionAdapter implements ClusterConnection {
   }
 
   @Override
+  public BufferedMutator getBufferedMutator(BufferedMutatorParams params)
+      throws IOException {
+    return wrappedConnection.getBufferedMutator(params);
+  }
+
+  @Override
+  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+    return wrappedConnection.getBufferedMutator(tableName);
+  }
+
+  @Override
   public RegionLocator getRegionLocator(TableName tableName) throws IOException {
     return wrappedConnection.getRegionLocator(tableName);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 70172a4..9361f61 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
-import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -713,6 +713,28 @@ class ConnectionManager {
     }
 
     @Override
+    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
+      if (params.getTableName() == null) {
+        throw new IllegalArgumentException("TableName cannot be null.");
+      }
+      if (params.getPool() == null) {
+        params.pool(HTable.getDefaultExecutor(getConfiguration()));
+      }
+      if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
+        params.writeBufferSize(tableConfig.getWriteBufferSize());
+      }
+      if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
+        params.maxKeyValueSize(tableConfig.getMaxKeyValueSize());
+      }
+      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+    }
+
+    @Override
+    public BufferedMutator getBufferedMutator(TableName tableName) {
+      return getBufferedMutator(new BufferedMutatorParams(tableName));
+    }
+
+    @Override
     public RegionLocator getRegionLocator(TableName tableName) throws IOException {
       if (managed) {
         throw new IOException("The connection has to be unmanaged.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 2ef51ab..c71a669 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -116,10 +115,8 @@ public class HTable implements HTableInterface, RegionLocator {
   private final TableName tableName;
   private volatile Configuration configuration;
   private TableConfiguration tableConfiguration;
-  protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
-  private long writeBufferSize;
+  protected BufferedMutatorImpl mutator;
   private boolean autoFlush = true;
-  protected long currentWriteBufferSize = 0 ;
   private boolean closed = false;
   protected int scannerCaching;
   private ExecutorService pool;  // For Multi & Scan
@@ -128,8 +125,6 @@ public class HTable implements HTableInterface, RegionLocator {
   private final boolean cleanupConnectionOnClose; // close the connection in close()
   private Consistency defaultConsistency = Consistency.STRONG;
 
-  /** The Async process for puts with autoflush set to false or multiputs */
-  protected AsyncProcess ap;
   /** The Async process for batch */
   protected AsyncProcess multiAp;
   private RpcRetryingCallerFactory rpcCallerFactory;
@@ -221,7 +216,7 @@ public class HTable implements HTableInterface, RegionLocator {
     // it also scales when new region servers are added.
     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
-    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    pool.allowCoreThreadTimeOut(true);
     return pool;
   }
 
@@ -325,14 +320,18 @@ public class HTable implements HTableInterface, RegionLocator {
   }
 
   /**
-   * For internal testing.
+   * For internal testing. Uses Connection provided in {@code params}.
+   * @throws IOException
    */
   @VisibleForTesting
-  protected HTable() {
-    tableName = null;
-    tableConfiguration = new TableConfiguration();
+  protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
+    connection = conn;
+    tableName = params.getTableName();
+    tableConfiguration = new TableConfiguration(connection.getConfiguration());
     cleanupPoolOnClose = false;
     cleanupConnectionOnClose = false;
+    // used from tests, don't trust the connection is real
+    this.mutator = new BufferedMutatorImpl(conn, null, null, params);
   }
 
   /**
@@ -352,9 +351,6 @@ public class HTable implements HTableInterface, RegionLocator {
 
     this.operationTimeout = tableName.isSystemTable() ?
         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
-    this.writeBufferSize = tableConfiguration.getWriteBufferSize();
-    this.autoFlush = true;
-    this.currentWriteBufferSize = 0;
     this.scannerCaching = tableConfiguration.getScannerCaching();
 
     if (this.rpcCallerFactory == null) {
@@ -365,7 +361,6 @@ public class HTable implements HTableInterface, RegionLocator {
     }
 
     // puts need to track errors globally due to how the APIs currently work.
-    ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
     multiAp = this.connection.getAsyncProcess();
 
     this.closed = false;
@@ -542,7 +537,7 @@ public class HTable implements HTableInterface, RegionLocator {
    */
   @Deprecated
   public List<Row> getWriteBuffer() {
-    return writeAsyncBuffer;
+    return mutator == null ? null : mutator.getWriteBuffer();
   }
 
   /**
@@ -663,6 +658,8 @@ public class HTable implements HTableInterface, RegionLocator {
    * This is mainly useful for the MapReduce integration.
    * @return A map of HRegionInfo with it's server address
    * @throws IOException if a remote or network exception occurs
+   *
+   * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
    */
   @Override
   public List<HRegionLocation> getAllRegionLocations() throws IOException {
@@ -1023,11 +1020,11 @@ public class HTable implements HTableInterface, RegionLocator {
 
   /**
    * {@inheritDoc}
+   * @throws IOException
    */
   @Override
-  public void put(final Put put)
-      throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-    doPut(put);
+  public void put(final Put put) throws IOException {
+    getBufferedMutator().mutate(put);
     if (autoFlush) {
       flushCommits();
     }
@@ -1035,82 +1032,16 @@ public class HTable implements HTableInterface, RegionLocator {
 
   /**
    * {@inheritDoc}
+   * @throws IOException
    */
   @Override
-  public void put(final List<Put> puts)
-      throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-    for (Put put : puts) {
-      doPut(put);
-    }
+  public void put(final List<Put> puts) throws IOException {
+    getBufferedMutator().mutate(puts);
     if (autoFlush) {
       flushCommits();
     }
   }
 
-
-  /**
-   * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
-   *  cluster.
-   * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
-   * @throws InterruptedIOException if we were interrupted.
-   */
-  private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-    // This behavior is highly non-intuitive... it does not protect us against
-    // 94-incompatible behavior, which is a timing issue because hasError, the below code
-    // and setter of hasError are not synchronized. Perhaps it should be removed.
-    if (ap.hasError()) {
-      writeAsyncBuffer.add(put);
-      backgroundFlushCommits(true);
-    }
-
-    validatePut(put);
-
-    currentWriteBufferSize += put.heapSize();
-    writeAsyncBuffer.add(put);
-
-    while (currentWriteBufferSize > writeBufferSize) {
-      backgroundFlushCommits(false);
-    }
-  }
-
-
-  /**
-   * Send the operations in the buffer to the servers. Does not wait for the server's answer.
-   * If the is an error (max retried reach from a previous flush or bad operation), it tries to
-   * send all operations in the buffer and sends an exception.
-   * @param synchronous - if true, sends all the writes and wait for all of them to finish before
-   *                     returning.
-   */
-  private void backgroundFlushCommits(boolean synchronous) throws
-      InterruptedIOException, RetriesExhaustedWithDetailsException {
-
-    try {
-      if (!synchronous) {
-        ap.submit(tableName, writeAsyncBuffer, true, null, false);
-        if (ap.hasError()) {
-          LOG.debug(tableName + ": One or more of the operations have failed -" +
-              " waiting for all operation in progress to finish (successfully or not)");
-        }
-      }
-      if (synchronous || ap.hasError()) {
-        while (!writeAsyncBuffer.isEmpty()) {
-          ap.submit(tableName, writeAsyncBuffer, true, null, false);
-        }
-        RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
-        if (error != null) {
-          throw error;
-        }
-      }
-    } finally {
-      currentWriteBufferSize = 0;
-      for (Row mut : writeAsyncBuffer) {
-        if (mut instanceof Mutation) {
-          currentWriteBufferSize += ((Mutation) mut).heapSize();
-        }
-      }
-    }
-  }
-
   /**
    * {@inheritDoc}
    */
@@ -1287,7 +1218,7 @@ public class HTable implements HTableInterface, RegionLocator {
           controller.setCallTimeout(callTimeout);
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+                getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), CompareType.EQUAL, put);
             MutateResponse response = getStub().mutate(controller, request);
             return Boolean.valueOf(response.getProcessed());
@@ -1474,12 +1405,11 @@ public class HTable implements HTableInterface, RegionLocator {
 
   /**
    * {@inheritDoc}
+   * @throws IOException
    */
   @Override
-  public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-    // As we can have an operation in progress even if the buffer is empty, we call
-    //  backgroundFlushCommits at least one time.
-    backgroundFlushCommits(true);
+  public void flushCommits() throws IOException {
+    getBufferedMutator().flush();
   }
 
   /**
@@ -1599,7 +1529,11 @@ public class HTable implements HTableInterface, RegionLocator {
    */
   @Override
   public long getWriteBufferSize() {
-    return writeBufferSize;
+    if (mutator == null) {
+      return tableConfiguration.getWriteBufferSize();
+    } else {
+      return mutator.getWriteBufferSize();
+    }
   }
 
   /**
@@ -1612,10 +1546,8 @@ public class HTable implements HTableInterface, RegionLocator {
    */
   @Override
   public void setWriteBufferSize(long writeBufferSize) throws IOException {
-    this.writeBufferSize = writeBufferSize;
-    if(currentWriteBufferSize > writeBufferSize) {
-      flushCommits();
-    }
+    getBufferedMutator();
+    mutator.setWriteBufferSize(writeBufferSize);
   }
 
   /**
@@ -1929,4 +1861,17 @@ public class HTable implements HTableInterface, RegionLocator {
           callbackErrorServers);
     }
   }
+
+  @VisibleForTesting
+  BufferedMutator getBufferedMutator() throws IOException {
+    if (mutator == null) {
+      this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
+          new BufferedMutatorParams(tableName)
+              .pool(pool)
+              .writeBufferSize(tableConfiguration.getWriteBufferSize())
+              .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
+      );
+    }
+    return mutator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
index 911e034..1f4d99a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
@@ -67,8 +67,8 @@ public interface HTableInterface extends Table {
    *          Whether or not to enable 'auto-flush'.
    * @deprecated in 0.96. When called with setAutoFlush(false), this function also
    *  set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
-   *  Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
-   *  {@link #setAutoFlushTo(boolean)} for all other cases.
+   *  Replace it with setAutoFlush(false, false) if this is exactly what you want, though
+   *  this is the method you want for most cases.
    */
   @Deprecated
   void setAutoFlush(boolean autoFlush);
@@ -105,13 +105,69 @@ public interface HTableInterface extends Table {
    *          the value of this parameter is ignored and clearBufferOnFail is set to true.
    *          Setting clearBufferOnFail to false is deprecated since 0.96.
    * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use
-   *  {@link #setAutoFlushTo(boolean)}} instead.
-   * @see #flushCommits
+   *  {@link #setAutoFlush(boolean)}} instead.
+   * @see BufferedMutator#flush()
    */
   @Deprecated
   void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail);
 
   /**
+   * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}.
+   * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use
+   * {@link #setAutoFlush(boolean)} instead, or better still, move on to {@link BufferedMutator}
+   */
+  @Deprecated
+  void setAutoFlushTo(boolean autoFlush);
+  
+  /**
+   * Tells whether or not 'auto-flush' is turned on.
+   *
+   * @return {@code true} if 'auto-flush' is enabled (default), meaning
+   * {@link Put} operations don't get buffered/delayed and are immediately
+   * executed.
+   * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator}
+   */
+  @Deprecated
+  boolean isAutoFlush();
+
+  /**
+   * Executes all the buffered {@link Put} operations.
+   * <p>
+   * This method gets called once automatically for every {@link Put} or batch
+   * of {@link Put}s (when <code>put(List<Put>)</code> is used) when
+   * {@link #isAutoFlush} is {@code true}.
+   * @throws IOException if a remote or network exception occurs.
+   * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#flush()}
+   */
+  @Deprecated
+  void flushCommits() throws IOException;
+
+  /**
+   * Returns the maximum size in bytes of the write buffer for this HTable.
+   * <p>
+   * The default value comes from the configuration parameter
+   * {@code hbase.client.write.buffer}.
+   * @return The size of the write buffer in bytes.
+   * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#getWriteBufferSize()}
+   */
+  @Deprecated
+  long getWriteBufferSize();
+
+  /**
+   * Sets the size of the buffer in bytes.
+   * <p>
+   * If the new size is less than the current amount of data in the
+   * write buffer, the buffer gets flushed.
+   * @param writeBufferSize The new write buffer size, in bytes.
+   * @throws IOException if a remote or network exception occurs.
+   * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} and
+   * {@link BufferedMutatorParams#writeBufferSize(long)}
+   */
+  @Deprecated
+  void setWriteBufferSize(long writeBufferSize) throws IOException;
+
+
+  /**
    * Return the row that matches <i>row</i> exactly,
    * or the one that immediately precedes it.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 07e4c08..dae8a25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -219,9 +219,7 @@ public interface Table extends Closeable {
 
   /**
    * Puts some data in the table.
-   * <p>
-   * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
-   * until the internal buffer is full.
+   * 
    * @param put The data to put.
    * @throws IOException if a remote or network exception occurs.
    * @since 0.20.0
@@ -231,9 +229,6 @@ public interface Table extends Closeable {
   /**
    * Puts some data in the table, in batch.
    * <p>
-   * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
-   * until the internal buffer is full.
-   * <p>
    * This can be used for group commit, or for submitting user defined
    * batches.  The writeBuffer will be periodically inspected while the List
    * is processed, so depending on the List size the writeBuffer may flush
@@ -498,30 +493,6 @@ public interface Table extends Closeable {
     final Batch.Callback<R> callback) throws ServiceException, Throwable;
 
   /**
-   * Tells whether or not 'auto-flush' is turned on.
-   *
-   * @return {@code true} if 'auto-flush' is enabled (default), meaning
-   * {@link Put} operations don't get buffered/delayed and are immediately
-   * executed.
-   */
-  boolean isAutoFlush();
-
-  /**
-   * Executes all the buffered {@link Put} operations.
-   * <p>
-   * This method gets called once automatically for every {@link Put} or batch
-   * of {@link Put}s (when <code>put(List<Put>)</code> is used) when
-   * {@link #isAutoFlush} is {@code true}.
-   * @throws IOException if a remote or network exception occurs.
-   */
-  void flushCommits() throws IOException;
-
-  /**
-   * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}
-   */
-  void setAutoFlushTo(boolean autoFlush);
-
-  /**
    * Returns the maximum size in bytes of the write buffer for this HTable.
    * <p>
    * The default value comes from the configuration parameter
@@ -540,7 +511,6 @@ public interface Table extends Closeable {
    */
   void setWriteBufferSize(long writeBufferSize) throws IOException;
 
-
   /**
    * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
    * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
index 6176a0c..70ad179 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java
@@ -28,20 +28,18 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 public class TableConfiguration {
 
-  private final long writeBufferSize;
+  public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
+  public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
+  public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
+  public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
 
+  private final long writeBufferSize;
   private final int metaOperationTimeout;
-
   private final int operationTimeout;
-
   private final int scannerCaching;
-
   private final int primaryCallTimeoutMicroSecond;
-
   private final int replicaCallTimeoutMicroSecondScan;
-
   private final int retries;
-
   private final int maxKeyValueSize;
 
   /**
@@ -49,7 +47,7 @@ public class TableConfiguration {
    * @param conf Configuration object
    */
   TableConfiguration(Configuration conf) {
-    this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
+    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
 
     this.metaOperationTimeout = conf.getInt(
       HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
@@ -70,7 +68,7 @@ public class TableConfiguration {
     this.retries = conf.getInt(
        HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 
-    this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
+    this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
   }
 
   /**
@@ -80,14 +78,14 @@ public class TableConfiguration {
    */
   @VisibleForTesting
   protected TableConfiguration() {
-    this.writeBufferSize = 2097152;
+    this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
     this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
     this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
     this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
     this.primaryCallTimeoutMicroSecond = 10000;
     this.replicaCallTimeoutMicroSecondScan = 1000000;
     this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
-    this.maxKeyValueSize = -1;
+    this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
   }
 
   public long getWriteBufferSize() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 97c3c37..8c6a4ce 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -154,8 +154,8 @@ public class TestAsyncProcess {
           new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
     }
 
-    public MyAsyncProcess(
-        ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
+    public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
+        @SuppressWarnings("unused") boolean dummy) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
               new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
         @Override
@@ -643,26 +643,27 @@ public class TestAsyncProcess {
     NonceGenerator ng = Mockito.mock(NonceGenerator.class);
     Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
     Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
+    Mockito.when(hc.getConfiguration()).thenReturn(conf);
     return hc;
   }
 
   @Test
   public void testHTablePutSuccess() throws Exception {
-    HTable ht = Mockito.mock(HTable.class);
+    BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
     ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
 
     Put put = createPut(1, true);
 
     Assert.assertEquals(0, ht.getWriteBufferSize());
-    ht.put(put);
+    ht.mutate(put);
     Assert.assertEquals(0, ht.getWriteBufferSize());
   }
 
   private void doHTableFailedPut(boolean bufferOn) throws Exception {
-    HTable ht = new HTable();
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
-    ht.ap = ap;
-    ht.setAutoFlushTo(true);
+    ClusterConnection conn = createHConnection();
+    HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
+    MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+    ht.mutator.ap = ap;
     if (bufferOn) {
       ht.setWriteBufferSize(1024L * 1024L);
     } else {
@@ -671,7 +672,7 @@ public class TestAsyncProcess {
 
     Put put = createPut(1, false);
 
-    Assert.assertEquals(0L, ht.currentWriteBufferSize);
+    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
     try {
       ht.put(put);
       if (bufferOn) {
@@ -680,7 +681,7 @@ public class TestAsyncProcess {
       Assert.fail();
     } catch (RetriesExhaustedException expected) {
     }
-    Assert.assertEquals(0L, ht.currentWriteBufferSize);
+    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
     // The table should have sent one request, maybe after multiple attempts
     AsyncRequestFuture ars = null;
     for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -707,16 +708,14 @@ public class TestAsyncProcess {
 
   @Test
   public void testHTableFailedPutAndNewPut() throws Exception {
-    HTable ht = new HTable();
-    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
-    ht.ap = ap;
-    // This is deprecated method. Using it here only because the new HTable above is a bit of a
-    // perversion skirting a bunch of setup.  Fix the HTable test-only constructor to do more.
-    ht.setAutoFlush(false, true);
-    ht.setWriteBufferSize(0);
+    ClusterConnection conn = createHConnection();
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
+        new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
+    MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+    mutator.ap = ap;
 
     Put p = createPut(1, false);
-    ht.put(p);
+    mutator.mutate(p);
 
     ap.waitUntilDone(); // Let's do all the retries.
 
@@ -726,13 +725,13 @@ public class TestAsyncProcess {
     //  puts, we may raise an exception in the middle of the list. It's then up to the caller to
     //  manage what was inserted, what was tried but failed, and what was not even tried.
     p = createPut(1, true);
-    Assert.assertEquals(0, ht.writeAsyncBuffer.size());
+    Assert.assertEquals(0, mutator.getWriteBuffer().size());
     try {
-      ht.put(p);
+      mutator.mutate(p);
       Assert.fail();
     } catch (RetriesExhaustedException expected) {
     }
-    Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size());
+    Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
   }
 
 
@@ -763,9 +762,9 @@ public class TestAsyncProcess {
 
   @Test
   public void testBatch() throws IOException, InterruptedException {
-    HTable ht = new HTable();
-    ht.connection = new MyConnectionImpl(conf);
-    ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
+    ClusterConnection conn = new MyConnectionImpl(conf);
+    HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
+    ht.multiAp = new MyAsyncProcess(conn, conf, false);
 
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
@@ -794,26 +793,24 @@ public class TestAsyncProcess {
 
   @Test
   public void testErrorsServers() throws IOException {
-    HTable ht = new HTable();
     Configuration configuration = new Configuration(conf);
+    ClusterConnection conn = new MyConnectionImpl(configuration);
+    BufferedMutatorImpl mutator =
+        new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
     configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
-    // set default writeBufferSize
-    ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
 
-    ht.connection = new MyConnectionImpl(configuration);
-    MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
-    ht.ap = ap;
+    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
+    mutator.ap = ap;
 
-    Assert.assertNotNull(ht.ap.createServerErrorTracker());
-    Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
-    ht.ap.serverTrackerTimeout = 1;
+    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+    Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
+    mutator.ap.serverTrackerTimeout = 1;
 
     Put p = createPut(1, false);
-    ht.setAutoFlushTo(false);
-    ht.put(p);
+    mutator.mutate(p);
 
     try {
-      ht.flushCommits();
+      mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
     }
@@ -823,19 +820,18 @@ public class TestAsyncProcess {
 
   @Test
   public void testGlobalErrors() throws IOException {
-    HTable ht = new HTable();
-    ht.connection = new MyConnectionImpl(conf);
-    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
-    ht.ap = ap;
+    ClusterConnection conn = new MyConnectionImpl(conf);
+    BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf);
+    mutator.ap = ap;
 
-    Assert.assertNotNull(ht.ap.createServerErrorTracker());
+    Assert.assertNotNull(mutator.ap.createServerErrorTracker());
 
     Put p = createPut(1, true);
-    ht.setAutoFlushTo(false);
-    ht.put(p);
+    mutator.mutate(p);
 
     try {
-      ht.flushCommits();
+      mutator.flush();
       Assert.fail();
     } catch (RetriesExhaustedWithDetailsException expected) {
     }
@@ -862,13 +858,12 @@ public class TestAsyncProcess {
       gets.add(get);
     }
 
-    HTable ht = new HTable();
     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
-    ht.connection = con;
+    HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
     MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
     ht.multiAp = ap;
 
-    ht.batch(gets);
+    ht.batch(gets, new Object[gets.size()]);
 
     Assert.assertEquals(ap.nbActions.get(), NB_REGS);
     Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
index b1ef5b9..2d50c1b 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
@@ -708,36 +708,47 @@ public class TestClientNoCluster extends Configured implements Tool {
    * @throws IOException
    */
   static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
-    Table table = sharedConnection.getTable(TableName.valueOf(BIG_USER_TABLE));
-    table.setAutoFlushTo(false);
     long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
     long startTime = System.currentTimeMillis();
     final int printInterval = 100000;
     Random rd = new Random(id);
     boolean get = c.getBoolean("hbase.test.do.gets", false);
-    try {
-      Stopwatch stopWatch = new Stopwatch();
-      stopWatch.start();
-      for (int i = 0; i < namespaceSpan; i++) {
-        byte [] b = format(rd.nextLong());
-        if (get){
+    TableName tableName = TableName.valueOf(BIG_USER_TABLE);
+    if (get) {
+      try (Table table = sharedConnection.getTable(tableName)){
+        Stopwatch stopWatch = new Stopwatch();
+        stopWatch.start();
+        for (int i = 0; i < namespaceSpan; i++) {
+          byte [] b = format(rd.nextLong());
           Get g = new Get(b);
           table.get(g);
-        } else {
+          if (i % printInterval == 0) {
+            LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
+            stopWatch.reset();
+            stopWatch.start();
+          }
+        }
+        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
+            (System.currentTimeMillis() - startTime) + "ms");
+      }
+    } else {
+      try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
+        Stopwatch stopWatch = new Stopwatch();
+        stopWatch.start();
+        for (int i = 0; i < namespaceSpan; i++) {
+          byte [] b = format(rd.nextLong());
           Put p = new Put(b);
           p.add(HConstants.CATALOG_FAMILY, b, b);
-          table.put(p);
+          mutator.mutate(p);
+          if (i % printInterval == 0) {
+            LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
+            stopWatch.reset();
+            stopWatch.start();
+          }
         }
-        if (i % printInterval == 0) {
-          LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
-          stopWatch.reset();
-          stopWatch.start();
+        LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
+            (System.currentTimeMillis() - startTime) + "ms");
         }
-      }
-      LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
-          (System.currentTimeMillis() - startTime) + "ms");
-    } finally {
-      table.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java
new file mode 100644
index 0000000..ab96741
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.example;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An example of using the {@link BufferedMutator} interface.
+ */
+public class BufferedMutatorExample extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(BufferedMutatorExample.class);
+
+  private static final int POOL_SIZE = 10;
+  private static final int TASK_COUNT = 100;
+  private static final TableName TABLE = TableName.valueOf("foo");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+
+  @Override
+  public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
+
+    /** a callback invoked when an asynchronous write fails. */
+    final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
+      @Override
+      public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
+        for (int i = 0; i < e.getNumExceptions(); i++) {
+          LOG.info("Failed to sent put " + e.getRow(i) + ".");
+        }
+      }
+    };
+    BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
+        .listener(listener);
+
+    //
+    // step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
+    //
+    try (final Connection conn = ConnectionFactory.createConnection(getConf());
+         final BufferedMutator mutator = conn.getBufferedMutator(params)) {
+
+      /** worker pool that operates on BufferedTable instances */
+      final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE);
+      List<Future<Void>> futures = new ArrayList<>(TASK_COUNT);
+
+      for (int i = 0; i < TASK_COUNT; i++) {
+        futures.add(workerPool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            //
+            // step 2: each worker sends edits to the shared BufferedMutator instance. They all use
+            // the same backing buffer, call-back "listener", and RPC executor pool.
+            //
+            Put p = new Put(Bytes.toBytes("someRow"));
+            p.add(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
+            mutator.mutate(p);
+            // do work... maybe you want to call mutator.flush() after many edits to ensure any of
+            // this worker's edits are sent before exiting the Callable
+            return null;
+          }
+        }));
+      }
+
+      //
+      // step 3: clean up the worker pool, shut down.
+      //
+      for (Future<Void> f : futures) {
+        f.get(5, TimeUnit.MINUTES);
+      }
+      workerPool.shutdown();
+    } catch (IOException e) {
+      // exception while creating/destroying Connection or BufferedMutator
+      LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
+    } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is
+      // invoked from here.
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new BufferedMutatorExample(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 0a62966..bd13800 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -53,6 +53,10 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -340,7 +344,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       byte[] id;
       long count = 0;
       int i;
-      Table table;
+      BufferedMutator mutator;
+      Connection connection;
       long numNodes;
       long wrap;
       int width;
@@ -349,7 +354,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       protected void setup(Context context) throws IOException, InterruptedException {
         id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
         Configuration conf = context.getConfiguration();
-        instantiateHTable(conf);
+        connection = ConnectionFactory.createConnection(conf);
+        instantiateHTable();
         this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
         current = new byte[this.width][];
         int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
@@ -361,15 +367,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
         }
       }
 
-      protected void instantiateHTable(Configuration conf) throws IOException {
-        table = new HTable(conf, getTableName(conf));
-        table.setAutoFlushTo(false);
-        table.setWriteBufferSize(4 * 1024 * 1024);
+      protected void instantiateHTable() throws IOException {
+        mutator = connection.getBufferedMutator(
+            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
+                .writeBufferSize(4 * 1024 * 1024));
       }
 
       @Override
       protected void cleanup(Context context) throws IOException ,InterruptedException {
-        table.close();
+        mutator.close();
+        connection.close();
       }
 
       @Override
@@ -419,7 +426,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
           if (id != null) {
             put.add(FAMILY_NAME, COLUMN_CLIENT, id);
           }
-          table.put(put);
+          mutator.mutate(put);
 
           if (i % 1000 == 0) {
             // Tickle progress every so often else maprunner will think us hung
@@ -427,7 +434,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
           }
         }
 
-        table.flushCommits();
+        mutator.flush();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
index e702805..c68ce4d 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -182,10 +181,9 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
       }
 
       @Override
-      protected void instantiateHTable(Configuration conf) throws IOException {
+      protected void instantiateHTable() throws IOException {
         for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
-          Table table = new HTable(conf, getTableName(i));
-          table.setAutoFlushTo(true);
+          Table table = connection.getTable(getTableName(i));
           //table.setWriteBufferSize(4 * 1024 * 1024);
           this.tables[i] = table;
         }
@@ -231,9 +229,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
             output.progress();
           }
         }
-        for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
-          tables[j].flushCommits();
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
index 8a7e9f1..8a0181c 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java
@@ -35,11 +35,13 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.IntegrationTestBase;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -111,8 +113,6 @@ public class IntegrationTestLoadAndVerify  extends IntegrationTestBase  {
 
   private static final int SCANNER_CACHING = 500;
 
-  protected IntegrationTestingUtility util;
-
   private String toRun = null;
 
   private enum Counters {
@@ -165,7 +165,8 @@ public void cleanUpCluster() throws Exception {
       extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
   {
     protected long recordsToWrite;
-    protected Table table;
+    protected Connection connection;
+    protected BufferedMutator mutator;
     protected Configuration conf;
     protected int numBackReferencesPerRow;
     protected String shortTaskId;
@@ -180,9 +181,10 @@ public void cleanUpCluster() throws Exception {
       recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
       String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
       numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
-      table = new HTable(conf, TableName.valueOf(tableName));
-      table.setWriteBufferSize(4*1024*1024);
-      table.setAutoFlushTo(false);
+      this.connection = ConnectionFactory.createConnection(conf);
+      mutator = connection.getBufferedMutator(
+          new BufferedMutatorParams(TableName.valueOf(tableName))
+              .writeBufferSize(4 * 1024 * 1024));
 
       String taskId = conf.get("mapreduce.task.attempt.id");
       Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
@@ -197,8 +199,8 @@ public void cleanUpCluster() throws Exception {
 
     @Override
     public void cleanup(Context context) throws IOException {
-      table.flushCommits();
-      table.close();
+      mutator.close();
+      connection.close();
     }
 
     @Override
@@ -230,7 +232,7 @@ public void cleanUpCluster() throws Exception {
             refsWritten.increment(1);
           }
           rowsWritten.increment(1);
-          table.put(p);
+          mutator.mutate(p);
 
           if (i % 100 == 0) {
             context.setStatus("Written " + i + "/" + recordsToWrite + " records");
@@ -239,7 +241,7 @@ public void cleanUpCluster() throws Exception {
         }
         // End of block, flush all of them before we start writing anything
         // pointing to these!
-        table.flushCommits();
+        mutator.flush();
       }
     }
   }
@@ -315,7 +317,7 @@ public void cleanUpCluster() throws Exception {
     NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
     conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
 
-    Job job = new Job(conf);
+    Job job = Job.getInstance(conf);
     job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
     job.setJarByClass(this.getClass());
     setMapperClass(job);
@@ -339,7 +341,7 @@ public void cleanUpCluster() throws Exception {
   protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
     Path outputDir = getTestDir(TEST_NAME, "verify-output");
 
-    Job job = new Job(conf);
+    Job job = Job.getInstance(conf);
     job.setJarByClass(this.getClass());
     job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
     setJobScannerConf(job);
@@ -393,7 +395,7 @@ public void cleanUpCluster() throws Exception {
 
     // Only disable and drop if we succeeded to verify - otherwise it's useful
     // to leave it around for post-mortem
-    getTestingUtil(getConf()).deleteTable(htd.getName());
+    getTestingUtil(getConf()).deleteTable(htd.getTableName());
   }
 
   public void usage() {
@@ -449,15 +451,17 @@ public void cleanUpCluster() throws Exception {
     HTableDescriptor htd = new HTableDescriptor(table);
     htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
 
-    Admin admin = new HBaseAdmin(getConf());
-    if (doLoad) {
-      admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
-      doLoad(getConf(), htd);
+    try (Connection conn = ConnectionFactory.createConnection(getConf());
+        Admin admin = conn.getAdmin()) {
+      if (doLoad) {
+        admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
+        doLoad(getConf(), htd);
+      }
     }
     if (doVerify) {
       doVerify(getConf(), htd);
       if (doDelete) {
-        getTestingUtil(getConf()).deleteTable(htd.getName());
+        getTestingUtil(getConf()).deleteTable(htd.getTableName());
       }
     }
     return 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
index ea9b228..e68cb38 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java
@@ -176,7 +176,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
           p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
           p.setCellVisibility(new CellVisibility(exp));
           getCounter(expIdx).increment(1);
-          table.put(p);
+          mutator.mutate(p);
 
           if (i % 100 == 0) {
             context.setStatus("Written " + i + "/" + recordsToWrite + " records");
@@ -185,7 +185,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
         }
         // End of block, flush all of them before we start writing anything
         // pointing to these!
-        table.flushCommits();
+        mutator.flush();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
index d942d9c..db49b67 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
@@ -22,11 +22,12 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -234,12 +235,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
 
   private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
     LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<Long>(25000);
-    Table ht = new HTable(util.getConfiguration(), this.tableName);
+    BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
     byte[] value = new byte[300];
     for (int x = 0; x < 5000; x++) {
       TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
       try {
-        ht.setAutoFlushTo(false);
         for (int i = 0; i < 5; i++) {
           long rk = random.nextLong();
           rowKeys.add(rk);
@@ -248,7 +248,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
             random.nextBytes(value);
             p.add(familyName, Bytes.toBytes(random.nextLong()), value);
           }
-          ht.put(p);
+          ht.mutate(p);
         }
         if ((x % 1000) == 0) {
           admin.flush(tableName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
index 5836442..dad5a32 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java
@@ -227,7 +227,6 @@ public class RowResource extends ResourceBase {
       }
       table = servlet.getTable(tableResource.getName());
       table.put(puts);
-      table.flushCommits();
       ResponseBuilder response = Response.ok();
       servlet.getMetrics().incrementSucessfulPutRequests(1);
       return response.build();
@@ -489,7 +488,6 @@ public class RowResource extends ResourceBase {
           .type(MIMETYPE_TEXT).entity("Value not Modified" + CRLF)
           .build();
       }
-      table.flushCommits();
       ResponseBuilder response = Response.ok();
       servlet.getMetrics().incrementSucessfulPutRequests(1);
       return response.build();
@@ -580,7 +578,6 @@ public class RowResource extends ResourceBase {
             .type(MIMETYPE_TEXT).entity(" Delete check failed." + CRLF)
             .build();
       }
-      table.flushCommits();
       ResponseBuilder response = Response.ok();
       servlet.getMetrics().incrementSucessfulDeleteRequests(1);
       return response.build();


Mime
View raw message