hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject hbase git commit: HBASE-19673 Backport "HBASE-19486 Periodically ensure records are not buffered too long by BufferedMutator" to branch-1
Date Sat, 06 Jan 2018 08:17:38 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 b528cd01b -> 42c8b5bec


HBASE-19673 Backport "HBASE-19486 Periodically ensure records are not buffered too long by
BufferedMutator" to branch-1

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/42c8b5be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/42c8b5be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/42c8b5be

Branch: refs/heads/branch-1
Commit: 42c8b5bec5cde489318691c5d97a8a1beecc88ae
Parents: b528cd0
Author: Niels Basjes <nbasjes@bol.com>
Authored: Sat Jan 6 15:49:43 2018 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Sat Jan 6 16:15:13 2018 +0800

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 160 +++++++++++++++--
 .../hbase/client/BufferedMutatorParams.java     |  49 +++++-
 .../hbase/client/ConnectionConfiguration.java   |  29 ++-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 122 ++++++++++++-
 .../hbase/client/TestBufferedMutatorParams.java | 175 +++++++++++++++++++
 5 files changed, 519 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/42c8b5be/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 1974be3..638955e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -15,27 +15,29 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
  * <p>
@@ -75,8 +77,19 @@ public class BufferedMutatorImpl implements BufferedMutator {
    * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
    */
   @VisibleForTesting
-  AtomicInteger undealtMutationCount = new AtomicInteger(0);
+  final AtomicInteger undealtMutationCount = new AtomicInteger(0);
   private long writeBufferSize;
+  /**
+   * Having the timer tick run more often that once every 100ms is needless and will
+   * probably cause too many timer events firing having a negative impact on performance.
+   */
+  public static final long MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS = 100;
+
+  private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
+  private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
+          new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+  private Timer writeBufferPeriodicFlushTimer = null;
+
   private final int maxKeyValueSize;
   private boolean closed = false;
   private final ExecutorService pool;
@@ -101,6 +114,20 @@ public class BufferedMutatorImpl implements BufferedMutator {
     ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
     this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
         params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+
+    // Set via the setter because it does value validation and starts/stops the TimerTask
+    long newWriteBufferPeriodicFlushTimeoutMs =
+            params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
+                    ? params.getWriteBufferPeriodicFlushTimeoutMs()
+                    : tableConf.getWriteBufferPeriodicFlushTimeoutMs();
+    long newWriteBufferPeriodicFlushTimerTickMs =
+            params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
+                    ? params.getWriteBufferPeriodicFlushTimerTickMs()
+                    : tableConf.getWriteBufferPeriodicFlushTimerTickMs();
+    this.setWriteBufferPeriodicFlush(
+            newWriteBufferPeriodicFlushTimeoutMs,
+            newWriteBufferPeriodicFlushTimerTickMs);
+
     this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
         params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
 
@@ -148,6 +175,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
       ++toAddCount;
     }
 
+    if (currentWriteBufferSize.get() == 0) {
+      firstRecordInBufferTimestamp.set(System.currentTimeMillis());
+    }
+
     // This behavior is highly non-intuitive... it does not protect us against
     // 94-incompatible behavior, which is a timing issue because hasError, the below code
     // and setter of hasError are not synchronized. Perhaps it should be removed.
@@ -169,6 +200,31 @@ public class BufferedMutatorImpl implements BufferedMutator {
     }
   }
 
+  @VisibleForTesting
+  protected long getExecutedWriteBufferPeriodicFlushes() {
+    return executedWriteBufferPeriodicFlushes.get();
+  }
+
+  private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
+  private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
+
+  private void timerCallbackForWriteBufferPeriodicFlush() {
+    if (currentWriteBufferSize.get() == 0) {
+      return; // Nothing to flush
+    }
+    long now = System.currentTimeMillis();
+    if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() >
now) {
+      return; // No need to flush yet
+    }
+    // The first record in the writebuffer has been in there too long --> flush
+    try {
+      executedWriteBufferPeriodicFlushes.incrementAndGet();
+      flush();
+    } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+      LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
+    }
+  }
+
   // validate for well-formedness
   public void validatePut(final Put put) throws IllegalArgumentException {
     HTable.validatePut(put, maxKeyValueSize);
@@ -180,6 +236,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
       if (this.closed) {
         return;
       }
+
+      // Stop any running Periodic Flush timer.
+      disableWriteBufferPeriodicFlush();
+
       // As we can have an operation in progress even if the buffer is empty, we call
       // backgroundFlushCommits at least one time.
       backgroundFlushCommits(true);
@@ -284,6 +344,81 @@ public class BufferedMutatorImpl implements BufferedMutator {
     return this.writeBufferSize;
   }
 
+  /**
+   * Sets the maximum time before the buffer is automatically flushed.
+   * @param timeoutMs    The maximum number of milliseconds how long records may be buffered
+   *                     before they are flushed. Set to 0 to disable.
+   * @param timerTickMs  The number of milliseconds between each check if the
+   *                     timeout has been exceeded. Must be 100ms (as defined in
+   *                     {@link #MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS})
+   *                     or larger to avoid performance problems.
+   */
+  public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs)
{
+    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs.get();
+    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
+
+    // Both parameters have minimal values.
+    writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
+    writeBufferPeriodicFlushTimerTickMs.set(
+            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
+
+    // If something changed we stop the old Timer.
+    if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
+        writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
+      if (writeBufferPeriodicFlushTimer != null) {
+        writeBufferPeriodicFlushTimer.cancel();
+        writeBufferPeriodicFlushTimer = null;
+      }
+    }
+
+    // If we have the need for a timer and there is none we start it
+    if (writeBufferPeriodicFlushTimer == null &&
+        writeBufferPeriodicFlushTimeoutMs.get() > 0) {
+      writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
+      writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
+        }
+      }, writeBufferPeriodicFlushTimerTickMs.get(),
+         writeBufferPeriodicFlushTimerTickMs.get());
+    }
+  }
+
+  /**
+   * Disable periodic flushing of the write buffer.
+   */
+  public void disableWriteBufferPeriodicFlush() {
+    setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+  }
+
+  /**
+   * Sets the maximum time before the buffer is automatically flushed checking once per second.
+   * @param timeoutMs    The maximum number of milliseconds how long records may be buffered
+   *                     before they are flushed. Set to 0 to disable.
+   */
+  public void setWriteBufferPeriodicFlush(long timeoutMs) {
+    setWriteBufferPeriodicFlush(timeoutMs, 1000L);
+  }
+
+  /**
+   * Returns the current periodic flush timeout value in milliseconds.
+   * @return The maximum number of milliseconds how long records may be buffered before they
+   *   are flushed. The value 0 means this is disabled.
+   */
+  public long getWriteBufferPeriodicFlushTimeoutMs() {
+    return writeBufferPeriodicFlushTimeoutMs.get();
+  }
+
+  /**
+   * Returns the current periodic flush timertick interval in milliseconds.
+   * @return The number of milliseconds between each check if the timeout has been exceeded.
+   *   This value only has a real meaning if the timeout has been set to > 0
+   */
+  public long getWriteBufferPeriodicFlushTimerTickMs() {
+    return writeBufferPeriodicFlushTimerTickMs.get();
+  }
+
   public void setRpcTimeout(int writeRpcTimeout) {
     this.writeRpcTimeout = writeRpcTimeout;
     this.ap.setRpcTimeout(writeRpcTimeout);
@@ -294,6 +429,11 @@ public class BufferedMutatorImpl implements BufferedMutator {
     this.ap.setOperationTimeout(operationTimeout);
   }
 
+  @VisibleForTesting
+  long getCurrentWriteBufferSize() {
+    return currentWriteBufferSize.get();
+  }
+
   /**
    * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should
not beÓ
    * called from production uses.

http://git-wip-us.apache.org/repos/asf/hbase/blob/42c8b5be/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index d4cdead..1ad9dbe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.ExecutorService;
-
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -30,12 +29,14 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class BufferedMutatorParams {
+public class BufferedMutatorParams implements Cloneable {
 
   static final int UNSET = -1;
 
   private final TableName tableName;
   private long writeBufferSize = UNSET;
+  private long writeBufferPeriodicFlushTimeoutMs = UNSET;
+  private long writeBufferPeriodicFlushTimerTickMs = UNSET;
   private int maxKeyValueSize = UNSET;
   private ExecutorService pool = null;
   private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener()
{
@@ -69,6 +70,30 @@ public class BufferedMutatorParams {
     return this;
   }
 
+  public long getWriteBufferPeriodicFlushTimeoutMs() {
+    return writeBufferPeriodicFlushTimeoutMs;
+  }
+
+  /**
+   * Set the max timeout before the buffer is automatically flushed.
+   */
+  public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs) {
+    this.writeBufferPeriodicFlushTimeoutMs = timeoutMs;
+    return this;
+  }
+
+  public long getWriteBufferPeriodicFlushTimerTickMs() {
+    return writeBufferPeriodicFlushTimerTickMs;
+  }
+
+  /**
+   * Set the TimerTick how often the buffer timeout if checked.
+   */
+  public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
+    this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
+    return this;
+  }
+
   public int getMaxKeyValueSize() {
     return maxKeyValueSize;
   }
@@ -107,4 +132,24 @@ public class BufferedMutatorParams {
     this.listener = listener;
     return this;
   }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see java.lang.Object#clone()
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
+          justification="The clone below is complete")
+  @Override
+  public BufferedMutatorParams clone() {
+    BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName);
+    clone.writeBufferSize                     = this.writeBufferSize;
+    clone.writeBufferPeriodicFlushTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs;
+    clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+    clone.maxKeyValueSize                     = this.maxKeyValueSize;
+    clone.pool                                = this.pool;
+    clone.listener                            = this.listener;
+    return clone;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/42c8b5be/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 65ddc78..0e51644 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -11,12 +11,11 @@
 
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Configuration parameters for the connection.
  * Configuration is a heavy weight registry that does a lot of string operations and regex
matching.
@@ -30,10 +29,18 @@ public class ConnectionConfiguration {
 
   public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
   public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
+  public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS =
+          "hbase.client.write.buffer.periodicflush.timeout.ms";
+  public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS =
+          "hbase.client.write.buffer.periodicflush.timertick.ms";
+  public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled
+  public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1
second
   public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
   public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
 
   private final long writeBufferSize;
+  private final long writeBufferPeriodicFlushTimeoutMs;
+  private final long writeBufferPeriodicFlushTimerTickMs;
   private final int metaOperationTimeout;
   private final int operationTimeout;
   private final int scannerCaching;
@@ -51,6 +58,14 @@ public class ConnectionConfiguration {
   ConnectionConfiguration(Configuration conf) {
     this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
 
+    this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
+
+    this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT);
+
     this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
 
@@ -88,6 +103,8 @@ public class ConnectionConfiguration {
   @VisibleForTesting
   protected ConnectionConfiguration() {
     this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
+    this.writeBufferPeriodicFlushTimeoutMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
+    this.writeBufferPeriodicFlushTimerTickMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT;
     this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
     this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
     this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
@@ -104,6 +121,14 @@ public class ConnectionConfiguration {
     return writeBufferSize;
   }
 
+  public long getWriteBufferPeriodicFlushTimeoutMs() {
+    return writeBufferPeriodicFlushTimeoutMs;
+  }
+
+  public long getWriteBufferPeriodicFlushTimerTickMs() {
+    return writeBufferPeriodicFlushTimerTickMs;
+  }
+
   public int getMetaOperationTimeout() {
     return metaOperationTimeout;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/42c8b5be/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index e8c7b73..52960b4 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -19,11 +19,11 @@
 
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.BufferedMutatorImpl.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -51,7 +51,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -1133,6 +1132,119 @@ public class TestAsyncProcess {
   }
 
   @Test
+  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+
+    checkPeriodicFlushParameters(conn, ap,
+            1234, 1234,
+            1234, 1234);
+    checkPeriodicFlushParameters(conn, ap,
+            0,    0,
+            0,    MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+    checkPeriodicFlushParameters(conn, ap,
+            -1234,    0,
+            -1234,    MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+    checkPeriodicFlushParameters(conn, ap,
+            1,    1,
+            1,    MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+  }
+
+  private void checkPeriodicFlushParameters(ClusterConnection conn,
+                                            MyAsyncProcess ap,
+                                            long setTO, long expectTO,
+                                            long setTT, long expectTT
+  ) {
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+
+    // The BufferedMutatorParams does nothing with the value
+    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
+    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
+    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
+
+    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
+    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, null, null, bufferParam);
+    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
+
+    // The BufferedMutatorImpl corrects illegal values (direct via setter)
+    BufferedMutatorImpl ht2 =
+            new BufferedMutatorImpl(conn, null, null, createBufferedMutatorParams(ap, DUMMY_TABLE));
+    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
+    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
+
+  }
+
+  @Test
+  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+
+    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1);     // Flush ASAP
+    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
+    bufferParam.writeBufferSize(10000);  // Write buffer set to much larger than the single
record
+
+    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, null, null, bufferParam);
+    ht.ap = ap;
+
+    // Verify if BufferedMutator has the right settings.
+    Assert.assertEquals(10000, ht.getWriteBufferSize());
+    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
+            ht.getWriteBufferPeriodicFlushTimerTickMs());
+
+    Put put = createPut(1, true);
+
+    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // ----- Insert, flush immediately, MUST NOT flush automatically
+    ht.mutate(put);
+    ht.flush();
+
+    Thread.sleep(1000);
+    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // ----- Insert, NO flush, MUST flush automatically
+    ht.mutate(put);
+    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+    // The timerTick should fire every 100ms, so after twice that we must have
+    // seen at least 1 tick and we should see an automatic flush
+    Thread.sleep(200);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // Ensure it does not flush twice
+    Thread.sleep(200);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
+    ht.disableWriteBufferPeriodicFlush();
+    ht.mutate(put);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+    // Wait for at least 1 timerTick, we should see NO flushes.
+    Thread.sleep(200);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+    // Reenable periodic flushing, a flush seems to take about 1 second
+    // so we wait for 2 seconds and it should have finished the flush.
+    ht.setWriteBufferPeriodicFlush(1, 100);
+    Thread.sleep(2000);
+    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+  }
+
+  @Test
   public void testTaskCheckerHost() throws IOException {
     final int maxTotalConcurrentTasks = 100;
     final int maxConcurrentTasksPerServer = 2;
@@ -1604,6 +1716,12 @@ public class TestAsyncProcess {
     return ap;
   }
 
+  private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap,
+                                                                   TableName name) {
+    return new BufferedMutatorParams(name)
+            .pool(ap.pool);
+  }
+
   private static List<Get> makeTimelineGets(byte[]... rows) {
     List<Get> result = new ArrayList<Get>();
     for (byte[] row : rows) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/42c8b5be/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
new file mode 100644
index 0000000..ead9469
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestBufferedMutatorParams {
+  @Rule
+  public TestName name = new TestName();
+
+  /**
+   * Just to create in instance, this doesn't actually function.
+   */
+  private class MockExecutorService implements ExecutorService {
+
+    @Override
+    public void execute(Runnable command) {
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) {
+      return false;
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+      return null;
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+      return null;
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+      return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+        Collection<? extends Callable<T>> tasks) {
+      return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
{
+      return null;
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+      return null;
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+        long timeout, TimeUnit unit) {
+      return null;
+    }
+  }
+
+  /**
+   * Just to create an instance, this doesn't actually function.
+   */
+  private static class MockExceptionListener implements BufferedMutator.ExceptionListener
{
+    @Override
+    public void onException(RetriesExhaustedWithDetailsException exception,
+        BufferedMutator mutator) {
+    }
+  }
+
+  @Test
+  public void testClone() {
+    ExecutorService pool = new MockExecutorService();
+    final String tableName = name.getMethodName();
+    BufferedMutatorParams bmp = new BufferedMutatorParams(TableName.valueOf(tableName));
+
+    BufferedMutator.ExceptionListener listener = new MockExceptionListener();
+    bmp
+            .writeBufferSize(17)
+            .setWriteBufferPeriodicFlushTimeoutMs(123)
+            .setWriteBufferPeriodicFlushTimerTickMs(456)
+            .maxKeyValueSize(13)
+            .pool(pool)
+            .listener(listener);
+    BufferedMutatorParams clone = bmp.clone();
+
+    // Confirm some literals
+    assertEquals(tableName, clone.getTableName().toString());
+    assertEquals(17, clone.getWriteBufferSize());
+    assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
+    assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
+    assertEquals(13, clone.getMaxKeyValueSize());
+
+    cloneTest(bmp, clone);
+
+    BufferedMutatorParams cloneWars = clone.clone();
+    cloneTest(clone, cloneWars);
+    cloneTest(bmp, cloneWars);
+  }
+
+  /**
+   * Confirm all fields are equal.
+   * @param some some instance
+   * @param clone a clone of that instance, but not the same instance.
+   */
+  private void cloneTest(BufferedMutatorParams some,
+      BufferedMutatorParams clone) {
+    assertFalse(some == clone);
+    assertEquals(some.getTableName().toString(),
+        clone.getTableName().toString());
+    assertEquals(some.getWriteBufferSize(), clone.getWriteBufferSize());
+    assertEquals(some.getWriteBufferPeriodicFlushTimeoutMs(),
+        clone.getWriteBufferPeriodicFlushTimeoutMs());
+    assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
+        clone.getWriteBufferPeriodicFlushTimerTickMs());
+    assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
+    assertTrue(some.getListener() == clone.getListener());
+    assertTrue(some.getPool() == clone.getPool());
+  }
+
+}


Mime
View raw message