hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [1/2] hbase git commit: HBASE-16451 Procedure v2 - Test WAL protobuf entry size limit
Date Wed, 24 Aug 2016 04:06:09 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 6e9b49cac -> 73818646b
  refs/heads/master 32c21f459 -> 97b164ac3


HBASE-16451 Procedure v2 - Test WAL protobuf entry size limit


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/73818646
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/73818646
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/73818646

Branch: refs/heads/branch-1
Commit: 73818646be6ac39aad1603e4d95766d8491cf0f0
Parents: 6e9b49c
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Tue Aug 23 20:43:29 2016 -0700
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Tue Aug 23 20:52:40 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/procedure2/util/ByteSlot.java  | 18 ++++++--
 .../procedure2/ProcedureTestingUtility.java     | 47 ++++++++++++++++++--
 .../store/wal/TestStressWALProcedureStore.java  | 15 +++++++
 .../store/wal/TestWALProcedureStore.java        | 14 ++----
 4 files changed, 77 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/73818646/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
index 8904116..c4ed9b7 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/ByteSlot.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ByteSlot extends OutputStream {
-  private static final int DOUBLE_GROW_LIMIT = 1 << 20;
+  private static final int LARGE_GROW_SIZE_THRESHOLD = 8 << 20;
+  private static final int LARGE_GROW_SIZE = 1 << 20;
+  private static final int RESET_THRESHOLD = 64 << 20;
   private static final int GROW_ALIGN = 128;
 
   private byte[] buf;
@@ -51,6 +53,9 @@ public class ByteSlot extends OutputStream {
   private int size;
 
   public void reset() {
+    if (buf != null && buf.length > RESET_THRESHOLD) {
+      buf = null;
+    }
     head = 0;
     size = 0;
   }
@@ -101,11 +106,16 @@ public class ByteSlot extends OutputStream {
     if (buf == null) {
       buf = new byte[minCapacity];
     } else if (minCapacity > buf.length) {
-      int newCapacity = buf.length << 1;
-      if (minCapacity > newCapacity || newCapacity > DOUBLE_GROW_LIMIT) {
+      int newCapacity;
+      if (buf.length <= LARGE_GROW_SIZE_THRESHOLD) {
+        newCapacity = buf.length << 1;
+      } else {
+        newCapacity = buf.length + LARGE_GROW_SIZE;
+      }
+      if (minCapacity > newCapacity) {
         newCapacity = minCapacity;
       }
       buf = Arrays.copyOf(buf, newCapacity);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/73818646/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 041461b..8b49e53 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -31,15 +31,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.util.Threads;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -89,6 +90,24 @@ public class ProcedureTestingUtility {
     procExecutor.start(execThreads, failOnCorrupted);
   }
 
+  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader
loader)
+      throws Exception {
+    procStore.stop(false);
+    procStore.start(procStore.getNumThreads());
+    procStore.recoverLease();
+    procStore.load(loader);
+  }
+
+  public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
+      long runnableCount, int completedCount, int corruptedCount) throws Exception {
+    final LoadCounter loader = new LoadCounter();
+    storeRestart(procStore, loader);
+    assertEquals(maxProcId, loader.getMaxProcId());
+    assertEquals(runnableCount, loader.getRunnableCount());
+    assertEquals(completedCount, loader.getCompletedCount());
+    assertEquals(corruptedCount, loader.getCorruptedCount());
+  }
+
   public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv>
procExecutor,
       boolean value) {
     if (procExecutor.testing == null) {
@@ -220,6 +239,8 @@ public class ProcedureTestingUtility {
   }
 
   public static class TestProcedure extends Procedure<Void> {
+    private byte[] data = null;
+
     public TestProcedure() {}
 
     public TestProcedure(long procId) {
@@ -227,6 +248,11 @@ public class ProcedureTestingUtility {
     }
 
     public TestProcedure(long procId, long parentId) {
+      this(procId, parentId, null);
+    }
+
+    public TestProcedure(long procId, long parentId, byte[] data) {
+      setData(data);
       setProcId(procId);
       if (parentId > 0) {
         setParentProcId(parentId);
@@ -241,6 +267,10 @@ public class ProcedureTestingUtility {
       setState(ProcedureState.FINISHED);
     }
 
+    public void setData(final byte[] data) {
+      this.data = data;
+    }
+
     @Override
     protected Procedure[] execute(Void env) { return null; }
 
@@ -251,10 +281,21 @@ public class ProcedureTestingUtility {
     protected boolean abort(Void env) { return false; }
 
     @Override
-    protected void serializeStateData(final OutputStream stream) throws IOException { }
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+      StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0);
+      if (data != null) stream.write(data);
+    }
 
     @Override
-    protected void deserializeStateData(final InputStream stream) throws IOException { }
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+      int len = StreamUtils.readRawVarint32(stream);
+      if (len > 0) {
+        data = new byte[len];
+        stream.read(data);
+      } else {
+        data = null;
+      }
+    }
   }
 
   public static class LoadCounter implements ProcedureStore.ProcedureLoader {

http://git-wip-us.apache.org/repos/asf/hbase/blob/73818646/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
index 8e15aef..8b1c49a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 
@@ -129,4 +130,18 @@ public class TestStressWALProcedureStore {
     assertTrue(procStore.getStoreTracker().isEmpty());
     assertEquals(1, procStore.getActiveLogs().size());
   }
+
+  @Test
+  public void testEntrySizeLimit() throws Exception {
+    final int NITEMS = 20;
+    for (int i = 1; i <= NITEMS; ++i) {
+      final byte[] data = new byte[256 << i];
+      LOG.info(String.format("Writing %s", StringUtils.humanSize(data.length)));
+      TestProcedure proc = new TestProcedure(i, 0, data);
+      procStore.insert(proc, null);
+    }
+
+    // check that we are able to read the big proc-blobs
+    ProcedureTestingUtility.storeRestartAndAssert(procStore, NITEMS, NITEMS, 0, 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73818646/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 8779d44..2bf9556 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -91,10 +91,7 @@ public class TestWALProcedureStore {
   }
 
   private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
-    procStore.stop(false);
-    procStore.start(PROCEDURE_STORE_SLOTS);
-    procStore.recoverLease();
-    procStore.load(loader);
+    ProcedureTestingUtility.storeRestart(procStore, loader);
   }
 
   @Test
@@ -486,6 +483,7 @@ public class TestWALProcedureStore {
     assertEquals(0, loader.getCorruptedCount());
   }
 
+  @Test
   public void testLoadChildren() throws Exception {
     TestProcedure a = new TestProcedure(1, 0);
     TestProcedure b = new TestProcedure(2, 1);
@@ -523,12 +521,8 @@ public class TestWALProcedureStore {
 
   private void restartAndAssert(long maxProcId, long runnableCount,
       int completedCount, int corruptedCount) throws Exception {
-    final LoadCounter loader = new LoadCounter();
-    storeRestart(loader);
-    assertEquals(maxProcId, loader.getMaxProcId());
-    assertEquals(runnableCount, loader.getRunnableCount());
-    assertEquals(completedCount, loader.getCompletedCount());
-    assertEquals(corruptedCount, loader.getCorruptedCount());
+    ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
+      runnableCount, completedCount, corruptedCount);
   }
 
   private void corruptLog(final FileStatus logFile, final long dropBytes)


Mime
View raw message