hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-14687 Un-synchronize BufferedMutator
Date Fri, 30 Oct 2015 20:42:16 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 de8917224 -> 3bdfa782f


HBASE-14687 Un-synchronize BufferedMutator

Summary:
Use concurrent collections and atomic longs to keep track of edits in buffered mutator.
This keeps buffered mutator as thread safe but it means that shared buffered mutators are
not contending on thread locking.

Test Plan: Unit Tests.

Differential Revision: https://reviews.facebook.net/D49467


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3bdfa782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3bdfa782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3bdfa782

Branch: refs/heads/branch-1
Commit: 3bdfa782f34666423254735591c9fd017cc8f3cd
Parents: de89172
Author: Elliott Clark <eclark@apache.org>
Authored: Mon Oct 26 14:47:36 2015 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Fri Oct 30 13:39:38 2015 -0700

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 106 +++++++++++++------
 .../hadoop/hbase/client/TestAsyncProcess.java   |  32 +-----
 2 files changed, 78 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3bdfa782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 8f43722..9ba2e13 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -15,6 +15,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,8 +29,10 @@ import java.io.InterruptedIOException;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * <p>
@@ -38,6 +41,12 @@ import java.util.concurrent.TimeUnit;
  * a {@link Connection} and call {@link #close()} afterwards.
  * </p>
  *
+ * <p>
+ * While this can be used accross threads, great care should be used when doing so.
+ * Errors are global to the buffered mutator and the Exceptions can be thrown on any
+ * thread that causes the flush for requests.
+ * </p>
+ *
  * @see ConnectionFactory
  * @see Connection
  * @since 1.0.0
@@ -53,12 +62,17 @@ public class BufferedMutatorImpl implements BufferedMutator {
   protected ClusterConnection connection; // non-final so can be overridden in test
   private final TableName tableName;
   private volatile Configuration conf;
-  private List<Row> writeAsyncBuffer = new LinkedList<>();
+  @VisibleForTesting
+  final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
+  @VisibleForTesting
+  AtomicLong currentWriteBufferSize = new AtomicLong(0);
+
   private long writeBufferSize;
   private final int maxKeyValueSize;
-  protected long currentWriteBufferSize = 0;
   private boolean closed = false;
   private final ExecutorService pool;
+
+  @VisibleForTesting
   protected AsyncProcess ap; // non-final so can be overridden in test
 
   BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
@@ -94,38 +108,41 @@ public class BufferedMutatorImpl implements BufferedMutator {
   }
 
   @Override
-  public synchronized void mutate(Mutation m) throws InterruptedIOException,
+  public void mutate(Mutation m) throws InterruptedIOException,
       RetriesExhaustedWithDetailsException {
     mutate(Arrays.asList(m));
   }
 
   @Override
-  public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
+  public void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
       RetriesExhaustedWithDetailsException {
+
     if (closed) {
       throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
     }
 
+    long toAddSize = 0;
     for (Mutation m : ms) {
       if (m instanceof Put) {
         validatePut((Put) m);
       }
-        currentWriteBufferSize += m.heapSize();
+      toAddSize += m.heapSize();
     }
 
-      // This behavior is highly non-intuitive... it does not protect us against
-      // 94-incompatible behavior, which is a timing issue because hasError, the below code
-      // and setter of hasError are not synchronized. Perhaps it should be removed.
-      if (ap.hasError()) {
-        writeAsyncBuffer.addAll(ms);
-        backgroundFlushCommits(true);
-      } else {
-        writeAsyncBuffer.addAll(ms);
-      }
-
+    // This behavior is highly non-intuitive... it does not protect us against
+    // 94-incompatible behavior, which is a timing issue because hasError, the below code
+    // and setter of hasError are not synchronized. Perhaps it should be removed.
+    if (ap.hasError()) {
+      currentWriteBufferSize.addAndGet(toAddSize);
+      writeAsyncBuffer.addAll(ms);
+      backgroundFlushCommits(true);
+    } else {
+      currentWriteBufferSize.addAndGet(toAddSize);
+      writeAsyncBuffer.addAll(ms);
+    }
 
     // Now try and queue what needs to be queued.
-    while (currentWriteBufferSize > writeBufferSize) {
+    while (currentWriteBufferSize.get() > writeBufferSize) {
       backgroundFlushCommits(false);
     }
   }
@@ -137,15 +154,15 @@ public class BufferedMutatorImpl implements BufferedMutator {
 
   @Override
   public synchronized void close() throws IOException {
-    if (this.closed) {
-      return;
-    }
     try {
+      if (this.closed) {
+        return;
+      }
       // As we can have an operation in progress even if the buffer is empty, we call
       // backgroundFlushCommits at least one time.
       backgroundFlushCommits(true);
       this.pool.shutdown();
-      boolean terminated = false;
+      boolean terminated;
       int loopCnt = 0;
       do {
         // wait until the pool has terminated
@@ -156,8 +173,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
           break;
         }
       } while (!terminated);
+
     } catch (InterruptedException e) {
       LOG.warn("waitForTermination interrupted");
+
     } finally {
       this.closed = true;
     }
@@ -179,19 +198,44 @@ public class BufferedMutatorImpl implements BufferedMutator {
    * @param synchronous - if true, sends all the writes and wait for all of them to finish
before
    *        returning.
    */
-  private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
+  private void backgroundFlushCommits(boolean synchronous) throws
+      InterruptedIOException,
       RetriesExhaustedWithDetailsException {
+
+    LinkedList<Mutation> buffer = new LinkedList<>();
+    // Keep track of the size so that this thread doesn't spin forever
+    long dequeuedSize = 0;
+
     try {
+      // Grab all of the available mutations.
+      Mutation m;
+
+      // If there's no buffer size drain everything. If there is a buffersize drain up to
twice
+      // that amount. This should keep the loop from continually spinning if there are threads
+      // that keep adding more data to the buffer.
+      while (
+          (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || synchronous)
+              && (m = writeAsyncBuffer.poll()) != null) {
+        buffer.add(m);
+        long size = m.heapSize();
+        dequeuedSize += size;
+        currentWriteBufferSize.addAndGet(-size);
+      }
+
+      if (!synchronous && dequeuedSize == 0) {
+        return;
+      }
+
       if (!synchronous) {
-        ap.submit(tableName, writeAsyncBuffer, true, null, false);
+        ap.submit(tableName, buffer, true, null, false);
         if (ap.hasError()) {
           LOG.debug(tableName + ": One or more of the operations have failed -"
               + " waiting for all operation in progress to finish (successfully or not)");
         }
       }
       if (synchronous || ap.hasError()) {
-        while (!writeAsyncBuffer.isEmpty()) {
-          ap.submit(tableName, writeAsyncBuffer, true, null, false);
+        while (!buffer.isEmpty()) {
+          ap.submit(tableName, buffer, true, null, false);
         }
         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
         if (error != null) {
@@ -203,11 +247,11 @@ public class BufferedMutatorImpl implements BufferedMutator {
         }
       }
     } finally {
-      currentWriteBufferSize = 0;
-      for (Row mut : writeAsyncBuffer) {
-        if (mut instanceof Mutation) {
-          currentWriteBufferSize += ((Mutation) mut).heapSize();
-        }
+      for (Mutation mut : buffer) {
+        long size = mut.heapSize();
+        currentWriteBufferSize.addAndGet(size);
+        dequeuedSize -= size;
+        writeAsyncBuffer.add(mut);
       }
     }
   }
@@ -221,7 +265,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
   public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
       InterruptedIOException {
     this.writeBufferSize = writeBufferSize;
-    if (currentWriteBufferSize > writeBufferSize) {
+    if (currentWriteBufferSize.get() > writeBufferSize) {
       flush();
     }
   }
@@ -241,6 +285,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
 Ó   */
   @Deprecated
   public List<Row> getWriteBuffer() {
-    return this.writeAsyncBuffer;
+    return Arrays.asList(writeAsyncBuffer.toArray(new Row[0]));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3bdfa782/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 5c21740..3d76fad 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -413,7 +413,7 @@ public class TestAsyncProcess {
   }
 
   @Rule
-  public Timeout timeout = new Timeout(10000); // 10 seconds max per method tested
+  public Timeout timeout = Timeout.millis(10000); // 10 seconds max per method tested
 
   @Test
   public void testSubmit() throws Exception {
@@ -703,7 +703,7 @@ public class TestAsyncProcess {
 
     Put put = createPut(1, false);
 
-    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
+    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
     try {
       ht.put(put);
       if (bufferOn) {
@@ -712,7 +712,7 @@ public class TestAsyncProcess {
       Assert.fail();
     } catch (RetriesExhaustedException expected) {
     }
-    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
+    Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
     // The table should have sent one request, maybe after multiple attempts
     AsyncRequestFuture ars = null;
     for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -765,32 +765,6 @@ public class TestAsyncProcess {
     Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
   }
 
-
-/*
-  @Test
-  public void testWithNoClearOnFail() throws IOException {
-    HTable ht = new HTable();
-    ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
-    ht.setAutoFlushTo(false);
-
-    Put p = createPut(1, false);
-    ht.put(p);
-    Assert.assertEquals(0, ht.writeAsyncBuffer.size());
-
-    try {
-      ht.flushCommits();
-    } catch (RetriesExhaustedWithDetailsException expected) {
-    }
-    Assert.assertEquals(1, ht.writeAsyncBuffer.size());
-
-    try {
-      ht.close();
-    } catch (RetriesExhaustedWithDetailsException expected) {
-    }
-    Assert.assertEquals(1, ht.writeAsyncBuffer.size());
-  }
-  */
-
   @Test
   public void testBatch() throws IOException, InterruptedException {
     ClusterConnection conn = new MyConnectionImpl(conf);


Mime
View raw message