hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject hbase git commit: HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations
Date Tue, 02 Jan 2018 09:53:41 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 6a0e6fefd -> a6081d30f


HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a6081d30
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a6081d30
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a6081d30

Branch: refs/heads/master
Commit: a6081d30f930d9599f7d52ab440b3205c7f2a7bf
Parents: 6a0e6fe
Author: Niels Basjes <nbasjes@bol.com>
Authored: Sun Dec 31 11:58:24 2017 +0100
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Tue Jan 2 17:26:22 2018 +0800

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 42 ++++++++++----------
 1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a6081d30/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 13b1a81..b171fc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -78,8 +78,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
   private final long writeBufferSize;
 
-  private long  writeBufferPeriodicFlushTimeoutMs;
-  private long  writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
+  private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
+  private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
+          new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
   private Timer writeBufferPeriodicFlushTimer = null;
 
   private final int maxKeyValueSize;
@@ -188,7 +189,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
     }
 
     if (currentWriteBufferSize.get() == 0) {
-      firstRecordInBufferTimestamp = System.currentTimeMillis();
+      firstRecordInBufferTimestamp.set(System.currentTimeMillis());
     }
 
     // This behavior is highly non-intuitive... it does not protect us against
@@ -214,23 +215,23 @@ public class BufferedMutatorImpl implements BufferedMutator {
 
   @VisibleForTesting
   protected long getExecutedWriteBufferPeriodicFlushes() {
-    return executedWriteBufferPeriodicFlushes;
+    return executedWriteBufferPeriodicFlushes.get();
   }
 
-  private long firstRecordInBufferTimestamp = 0;
-  private long executedWriteBufferPeriodicFlushes = 0;
+  private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
+  private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
 
   private void timerCallbackForWriteBufferPeriodicFlush() {
     if (currentWriteBufferSize.get() == 0) {
       return; // Nothing to flush
     }
     long now = System.currentTimeMillis();
-    if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) {
+    if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() >
now) {
       return; // No need to flush yet
     }
     // The first record in the writebuffer has been in there too long --> flush
     try {
-      executedWriteBufferPeriodicFlushes++;
+      executedWriteBufferPeriodicFlushes.incrementAndGet();
       flush();
     } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
       LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
@@ -370,18 +371,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
   }
 
   @Override
-  public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
-    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs;
-    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+  public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs)
{
+    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs.get();
+    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
 
     // Both parameters have minimal values.
-    this.writeBufferPeriodicFlushTimeoutMs   = Math.max(0, timeoutMs);
-    this.writeBufferPeriodicFlushTimerTickMs =
-            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);
+    writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
+    writeBufferPeriodicFlushTimerTickMs.set(
+            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
 
     // If something changed we stop the old Timer.
-    if (this.writeBufferPeriodicFlushTimeoutMs   != originalTimeoutMs  ||
-        this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
+    if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
+        writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
       if (writeBufferPeriodicFlushTimer != null) {
         writeBufferPeriodicFlushTimer.cancel();
         writeBufferPeriodicFlushTimer = null;
@@ -390,25 +391,26 @@ public class BufferedMutatorImpl implements BufferedMutator {
 
     // If we have the need for a timer and there is none we start it
     if (writeBufferPeriodicFlushTimer == null &&
-        writeBufferPeriodicFlushTimeoutMs > 0) {
+        writeBufferPeriodicFlushTimeoutMs.get() > 0) {
       writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
       writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
         @Override
         public void run() {
           BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
         }
-      }, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs);
+      }, writeBufferPeriodicFlushTimerTickMs.get(),
+         writeBufferPeriodicFlushTimerTickMs.get());
     }
   }
 
   @Override
   public long getWriteBufferPeriodicFlushTimeoutMs() {
-    return this.writeBufferPeriodicFlushTimeoutMs;
+    return writeBufferPeriodicFlushTimeoutMs.get();
   }
 
   @Override
   public long getWriteBufferPeriodicFlushTimerTickMs() {
-    return this.writeBufferPeriodicFlushTimerTickMs;
+    return writeBufferPeriodicFlushTimerTickMs.get();
   }
 
   @Override


Mime
View raw message