hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject hbase git commit: HBASE-16802 Procedure v2 - group procedure cleaning
Date Tue, 11 Oct 2016 23:50:05 GMT
Repository: hbase
Updated Branches:
  refs/heads/master eb52e2682 -> 662a1b241


HBASE-16802 Procedure v2 - group procedure cleaning


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/662a1b24
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/662a1b24
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/662a1b24

Branch: refs/heads/master
Commit: 662a1b241f05f93aee9a5b05d7929a482a5bfcd5
Parents: eb52e26
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Tue Oct 11 16:17:09 2016 -0700
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Tue Oct 11 16:48:21 2016 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     | 18 ++++++-
 .../procedure2/store/NoopProcedureStore.java    |  5 ++
 .../hbase/procedure2/store/ProcedureStore.java  | 10 ++++
 .../procedure2/store/wal/WALProcedureStore.java | 38 +++++++++++++
 .../procedure2/ProcedureTestingUtility.java     | 12 ++++-
 .../store/wal/TestWALProcedureStore.java        | 57 ++++++++++++++++++--
 6 files changed, 132 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/662a1b24/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 0572dcf..2eeef9e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -138,6 +138,9 @@ public class ProcedureExecutor<TEnvironment> {
     private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
 
+    private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
+    private static final int DEFAULT_BATCH_SIZE = 32;
+
     private final Map<Long, ProcedureInfo> completed;
     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
     private final ProcedureStore store;
@@ -165,6 +168,10 @@ public class ProcedureExecutor<TEnvironment> {
 
       final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
       final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
+      final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+
+      final long[] batchIds = new long[batchSize];
+      int batchCount = 0;
 
       final long now = EnvironmentEdgeManager.currentTime();
       final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
@@ -179,15 +186,22 @@ public class ProcedureExecutor<TEnvironment> {
           if (isDebugEnabled) {
             LOG.debug("Evict completed procedure: " + procInfo);
           }
-          store.delete(entry.getKey());
+          batchIds[batchCount++] = entry.getKey();
+          if (batchCount == batchIds.length) {
+            store.delete(batchIds, 0, batchCount);
+            batchCount = 0;
+          }
           it.remove();
 
-          NonceKey nonceKey = procInfo.getNonceKey();
+          final NonceKey nonceKey = procInfo.getNonceKey();
           if (nonceKey != null) {
             nonceKeysToProcIdsMap.remove(nonceKey);
           }
         }
       }
+      if (batchCount > 0) {
+        store.delete(batchIds, 0, batchCount);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/662a1b24/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index c9808a1..82ef8f0 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -75,4 +75,9 @@ public class NoopProcedureStore extends ProcedureStoreBase {
   public void delete(Procedure proc, long[] subprocs) {
     // no-op
   }
+
+  @Override
+  public void delete(long[] procIds, int offset, int count) {
+    // no-op
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/662a1b24/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 11216d8..7df5226 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -196,4 +196,14 @@ public interface ProcedureStore {
    * @param subProcIds the IDs of the sub-procedure to remove.
    */
   void delete(Procedure parentProc, long[] subProcIds);
+
+  /**
+   * The specified procIds were removed from the executor,
+   * due to completion, abort or failure.
+   * The store implementor should remove all the information about the specified procIds.
+   * @param procIds the IDs of the procedures to remove.
+   * @param offset the array offset from where to start to delete
+   * @param count the number of IDs to delete
+   */
+  void delete(long[] procIds, int offset, int count);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/662a1b24/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 1e60402..3a46f8f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -465,6 +465,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   @Override
   public void delete(final Procedure proc, final long[] subProcIds) {
+    assert proc != null : "expected a non-null procedure";
+    assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
     if (LOG.isTraceEnabled()) {
       LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
     }
@@ -486,6 +488,42 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
+  @Override
+  public void delete(final long[] procIds, final int offset, final int count) {
+    if (count == 0) return;
+    if (offset == 0 && count == procIds.length) {
+      delete(procIds);
+    } else if (count == 1) {
+      delete(procIds[offset]);
+    } else {
+      delete(Arrays.copyOfRange(procIds, offset, offset + count));
+    }
+  }
+
+  private void delete(final long[] procIds) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Delete " + Arrays.toString(procIds));
+    }
+
+    final ByteSlot slot = acquireSlot();
+    try {
+      // Serialize the delete
+      for (int i = 0; i < procIds.length; ++i) {
+        ProcedureWALFormat.writeDelete(slot, procIds[i]);
+      }
+
+      // Push the transaction data and wait until it is persisted
+      pushData(PushType.DELETE, slot, -1, procIds);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+  }
+
   private ByteSlot acquireSlot() {
     ByteSlot slot = slotsCache.poll();
     return slot != null ? slot : new ByteSlot();

http://git-wip-us.apache.org/repos/asf/hbase/blob/662a1b24/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index d767a0f..0b85ff8 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -97,7 +97,7 @@ public class ProcedureTestingUtility {
     procStore.load(loader);
   }
 
-  public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
+  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
       long runnableCount, int completedCount, int corruptedCount) throws Exception {
     final LoadCounter loader = new LoadCounter();
     storeRestart(procStore, loader);
@@ -105,6 +105,7 @@ public class ProcedureTestingUtility {
     assertEquals(runnableCount, loader.getRunnableCount());
     assertEquals(completedCount, loader.getCompletedCount());
     assertEquals(corruptedCount, loader.getCorruptedCount());
+    return loader;
   }
 
   public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv>
procExecutor,
@@ -366,6 +367,15 @@ public class ProcedureTestingUtility {
       return corrupted.size();
     }
 
+    public boolean isRunnable(final long procId) {
+      for (Procedure proc: runnable) {
+        if (proc.getProcId() == procId) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     @Override
     public void setMaxProcId(long maxProcId) {
       this.maxProcId = maxProcId;

http://git-wip-us.apache.org/repos/asf/hbase/blob/662a1b24/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 5353d62..7ecffa1 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -404,13 +404,13 @@ public class TestWALProcedureStore {
     procStore.insert(procs[1], null);
     procStore.insert(procs[2], null);
     procStore.insert(procs[3], null);
-    procStore.delete(procs[0], null);
+    procStore.delete(procs[0].getProcId());
     procStore.rollWriterForTesting();
-    procStore.delete(procs[2], null);
+    procStore.delete(procs[2].getProcId());
     procStore.update(procs[3]);
     procStore.insert(procs[4], null);
     procStore.rollWriterForTesting();
-    procStore.delete(procs[4], null);
+    procStore.delete(procs[4].getProcId());
     procStore.insert(procs[5], null);
 
     // Stop the store
@@ -737,9 +737,56 @@ public class TestWALProcedureStore {
     restartAndAssert(3, 0, 1, 0);
   }
 
-  private void restartAndAssert(long maxProcId, long runnableCount,
+  @Test
+  public void testBatchDelete() throws Exception {
+    for (int i = 1; i < 10; ++i) {
+      procStore.insert(new TestProcedure(i), null);
+    }
+
+    // delete nothing
+    long[] toDelete = new long[] { 1, 2, 3, 4 };
+    procStore.delete(toDelete, 2, 0);
+    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
+    for (int i = 1; i < 10; ++i) {
+      assertEquals(true, loader.isRunnable(i));
+    }
+
+    // delete the full "toDelete" array (2, 4, 6, 8)
+    toDelete = new long[] { 2, 4, 6, 8 };
+    procStore.delete(toDelete, 0, toDelete.length);
+    loader = restartAndAssert(9, 5, 0, 0);
+    for (int i = 1; i < 10; ++i) {
+      assertEquals(i % 2 != 0, loader.isRunnable(i));
+    }
+
+    // delete a slice of "toDelete" (1, 3)
+    toDelete = new long[] { 5, 7, 1, 3, 9 };
+    procStore.delete(toDelete, 2, 2);
+    loader = restartAndAssert(9, 3, 0, 0);
+    for (int i = 1; i < 10; ++i) {
+      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
+    }
+
+    // delete a single item (5)
+    toDelete = new long[] { 5 };
+    procStore.delete(toDelete, 0, 1);
+    loader = restartAndAssert(9, 2, 0, 0);
+    for (int i = 1; i < 10; ++i) {
+      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
+    }
+
+    // delete remaining using a slice of "toDelete" (7, 9)
+    toDelete = new long[] { 0, 7, 9 };
+    procStore.delete(toDelete, 1, 2);
+    loader = restartAndAssert(0, 0, 0, 0);
+    for (int i = 1; i < 10; ++i) {
+      assertEquals(false, loader.isRunnable(i));
+    }
+  }
+
+  private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
       int completedCount, int corruptedCount) throws Exception {
-    ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
+    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
       runnableCount, completedCount, corruptedCount);
   }
 


Mime
View raw message