hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject hbase git commit: HBASE-16094 Improve cleaning up of proc wals. Makes the proc log cleaner smarter. Current method: starting from oldest log, keep deleting WALs until we encounter one which contains state of an active proc. Note that the state may be sta
Date Thu, 18 Aug 2016 20:30:35 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 642d2fe0f -> ce8827034


HBASE-16094 Improve cleaning up of proc wals.
Makes the proc log cleaner smarter.
Current method: starting from oldest log, keep deleting WALs until we encounter one which
contains state of an active proc. Note that the state may be stale.
New method: Same as current method, but will also delete the wals which contain stale state.
In the starting, iterate from newest completed log (just before currently active log) and
mark it for delete if it doesn't contain any state which will be required in case of recovery.
Tested: unittests.

Change-Id: Ie7499652e25af4aac8e9d71a42d60c61a03fd4e4


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ce882703
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ce882703
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ce882703

Branch: refs/heads/master
Commit: ce88270340872b43083ee7b79ac3027557597224
Parents: 642d2fe
Author: Apekshit Sharma <appy@apache.org>
Authored: Fri Jul 29 12:43:20 2016 -0700
Committer: Apekshit Sharma <appy@apache.org>
Committed: Thu Aug 18 13:30:21 2016 -0700

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java | 163 ++++++++++++++-----
 .../procedure2/store/wal/ProcedureWALFile.java  |  17 +-
 .../store/wal/ProcedureWALFormat.java           |   2 +-
 .../procedure2/store/wal/WALProcedureStore.java |  80 ++++++---
 .../store/TestProcedureStoreTracker.java        | 109 +++++++++++++
 .../store/wal/TestWALProcedureStore.java        | 111 +++++++++++++
 6 files changed, 420 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ce882703/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 26422a3..78d6a44 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hbase.procedure2.store;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
@@ -40,7 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
 @InterfaceStability.Evolving
 public class ProcedureStoreTracker {
   // Key is procedure id corresponding to first bit of the bitmap.
-  private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
+  private final TreeMap<Long, BitSetNode> map = new TreeMap<>();
 
   /**
    * If true, do not remove bits corresponding to deleted procedures. Note that this can
result
@@ -78,6 +76,16 @@ public class ProcedureStoreTracker {
      * Mimics {@link ProcedureStoreTracker#partial}.
      */
     private final boolean partial;
+
+    /* ----------------------
+     * |  updated | deleted |  meaning
+     * |     0    |   0     |  proc exists, but hasn't been updated since last resetUpdates().
+     * |     1    |   0     |  proc was updated (but not deleted).
+     * |     1    |   1     |  proc was deleted.
+     * |     0    |   1     |  proc doesn't exist (maybe never created, maybe deleted in
past).
+    /* ----------------------
+     */
+
     /**
      * Set of procedures which have been updated since last {@link #resetUpdates()}.
      * Useful to track procedures which have been updated since last WAL write.
@@ -135,6 +143,25 @@ public class ProcedureStoreTracker {
       this.partial = false;
     }
 
+    public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
+      start = data.getStartId();
+      int size = data.getUpdatedCount();
+      updated = new long[size];
+      deleted = new long[size];
+      for (int i = 0; i < size; ++i) {
+        updated[i] = data.getUpdated(i);
+        deleted[i] = data.getDeleted(i);
+      }
+      partial = false;
+    }
+
+    public BitSetNode(BitSetNode other) {
+      this.start = other.start;
+      this.partial = other.partial;
+      this.updated = other.updated.clone();
+      this.deleted = other.deleted.clone();
+    }
+
     public void update(final long procId) {
       updateState(procId, false);
     }
@@ -221,10 +248,38 @@ public class ProcedureStoreTracker {
       }
     }
 
-    // ========================================================================
-    //  Convert to/from Protocol Buffer.
-    // ========================================================================
+    /**
+     * If an active (non-deleted) procedure in current BitSetNode has been updated in {@code
other}
+     * BitSetNode, then delete it from current node.
+     * @return true if node changed, i.e. some procedure(s) from {@code other} was subtracted
from
+     * current node.
+     */
+    public boolean subtract(BitSetNode other) {
+      // Assert that other node intersects with this node.
+      assert !(other.getEnd() < this.start) && !(this.getEnd() < other.start);
+      int thisOffset = 0, otherOffset = 0;
+      if (this.start < other.start) {
+        thisOffset = (int) (other.start - this.start) / BITS_PER_WORD;
+      } else {
+        otherOffset = (int) (this.start - other.start) / BITS_PER_WORD;
+      }
+      int size = Math.min(this.updated.length - thisOffset, other.updated.length - otherOffset);
+      boolean nonZeroIntersect = false;
+      for (int i = 0; i < size; i++) {
+        long intersect = ~this.deleted[thisOffset + i] & other.updated[otherOffset +
i];
+        if (intersect != 0) {
+          this.deleted[thisOffset + i] |= intersect;
+          nonZeroIntersect = true;
+        }
+      }
+      return nonZeroIntersect;
+    }
 
+    /**
+     * Convert to
+     * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode}
+     * protobuf.
+     */
     public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
       ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
         ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
@@ -236,17 +291,6 @@ public class ProcedureStoreTracker {
       return builder.build();
     }
 
-    public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data)
{
-      long start = data.getStartId();
-      int size = data.getUpdatedCount();
-      long[] updated = new long[size];
-      long[] deleted = new long[size];
-      for (int i = 0; i < size; ++i) {
-        updated[i] = data.getUpdated(i);
-        deleted[i] = data.getDeleted(i);
-      }
-      return new BitSetNode(start, updated, deleted);
-    }
 
     // ========================================================================
     //  Grow/Merge Helpers
@@ -378,15 +422,15 @@ public class ProcedureStoreTracker {
       int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
       long value = (1L << bitmapIndex);
 
+      updated[wordIndex] |= value;
       if (isDeleted) {
-        updated[wordIndex] |= value;
         deleted[wordIndex] |= value;
       } else {
-        updated[wordIndex] |= value;
         deleted[wordIndex] &= ~value;
       }
     }
 
+
     // ========================================================================
     //  Helpers
     // ========================================================================
@@ -405,6 +449,28 @@ public class ProcedureStoreTracker {
     }
   }
 
+  public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf)
+      throws IOException {
+    reset();
+    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList())
{
+      final BitSetNode node = new BitSetNode(protoNode);
+      map.put(node.getStart(), node);
+    }
+  }
+
+  /**
+   * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
+   */
+  public void resetTo(ProcedureStoreTracker tracker) {
+    this.partial = tracker.partial;
+    this.minUpdatedProcId = tracker.minUpdatedProcId;
+    this.maxUpdatedProcId = tracker.maxUpdatedProcId;
+    this.keepDeletes = tracker.keepDeletes;
+    for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
+      map.put(entry.getKey(), new BitSetNode(entry.getValue()));
+    }
+  }
+
   public void insert(long procId) {
     BitSetNode node = getOrCreateNode(procId);
     node.update(procId);
@@ -531,7 +597,7 @@ public class ProcedureStoreTracker {
    */
   public boolean isEmpty() {
     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
-      if (entry.getValue().isEmpty() == false) {
+      if (!entry.getValue().isEmpty()) {
         return false;
       }
     }
@@ -543,7 +609,7 @@ public class ProcedureStoreTracker {
    */
   public boolean isUpdated() {
     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
-      if (entry.getValue().isUpdated() == false) {
+      if (!entry.getValue().isUpdated()) {
         return false;
       }
     }
@@ -654,33 +720,52 @@ public class ProcedureStoreTracker {
       entry.getValue().dump();
     }
   }
+  /**
+   * Iterates over
+   * {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other}
+   * tracker.
+   * @return true if tracker changed, i.e. some procedure from {@code other} were subtracted
from
+   * current tracker.
+   */
+  public boolean subtract(ProcedureStoreTracker other) {
+    // Can not intersect partial bitmap.
+    assert !partial && !other.partial;
+    boolean nonZeroIntersect = false;
+    for (Map.Entry<Long, BitSetNode> currentEntry : map.entrySet()) {
+      BitSetNode currentBitSetNode = currentEntry.getValue();
+      Map.Entry<Long, BitSetNode> otherTrackerEntry = other.map.floorEntry(currentEntry.getKey());
+      if (otherTrackerEntry == null  // No node in other map with key <= currentEntry.getKey().
+          // First entry in other map doesn't intersect with currentEntry.
+          || otherTrackerEntry.getValue().getEnd() < currentEntry.getKey()) {
+        otherTrackerEntry = other.map.ceilingEntry(currentEntry.getKey());
+        if (otherTrackerEntry == null || !currentBitSetNode.contains(otherTrackerEntry.getKey()))
{
+          // No node in other map intersects with currentBitSetNode's range.
+          continue;
+        }
+      }
+      do {
+        nonZeroIntersect |= currentEntry.getValue().subtract(otherTrackerEntry.getValue());
+        otherTrackerEntry = other.map.higherEntry(otherTrackerEntry.getKey());
+      } while (otherTrackerEntry != null && currentBitSetNode.contains(otherTrackerEntry.getKey()));
+    }
+    return nonZeroIntersect;
+  }
+
+  // ========================================================================
+  //  Convert to/from Protocol Buffer.
+  // ========================================================================
 
   /**
    * Builds
    * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker}
-   * protocol buffer from current state, serializes it and writes to the {@code stream}.
+   * protocol buffer from current state.
    */
-  public void writeTo(final OutputStream stream) throws IOException {
+  public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException {
     ProcedureProtos.ProcedureStoreTracker.Builder builder =
         ProcedureProtos.ProcedureStoreTracker.newBuilder();
     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
       builder.addNode(entry.getValue().convert());
     }
-    builder.build().writeDelimitedTo(stream);
-  }
-
-  /**
-   * Reads serialized
-   * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker}
-   * protocol buffer from the {@code stream}, and use it to build the state.
-   */
-  public void readFrom(final InputStream stream) throws IOException {
-    reset();
-    final ProcedureProtos.ProcedureStoreTracker data =
-        ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
-    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList())
{
-      final BitSetNode node = BitSetNode.convert(protoNode);
-      map.put(node.getStart(), node);
-    }
+    return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ce882703/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 097cd29..99e7a7e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
 
@@ -50,6 +51,12 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile>
{
   private long logSize;
   private long timestamp;
 
+  public ProcedureStoreTracker getTracker() {
+    return tracker;
+  }
+
+  private final ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+
   public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
     this.fs = fs;
     this.logFile = logStatus.getPath();
@@ -88,16 +95,22 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile>
{
     }
   }
 
-  public void readTracker(ProcedureStoreTracker tracker) throws IOException {
+  public void readTracker() throws IOException {
     ProcedureWALTrailer trailer = readTrailer();
     try {
       stream.seek(trailer.getTrackerPos());
-      tracker.readFrom(stream);
+      final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf =
+          ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
+      tracker.resetToProto(trackerProtoBuf);
     } finally {
       stream.seek(startPos);
     }
   }
 
+  public void updateLocalTracker(ProcedureStoreTracker tracker) {
+    this.tracker.resetTo(tracker);
+  }
+
   public void close() {
     if (stream == null) return;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ce882703/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 02b2db9..775ec11 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -123,7 +123,7 @@ public final class ProcedureWALFormat {
       .build().writeDelimitedTo(stream);
 
     // Write Tracker
-    tracker.writeTo(stream);
+    tracker.toProto().writeDelimitedTo(stream);
 
     stream.write(TRAILER_VERSION);
     StreamUtils.writeLong(stream, TRAILER_MAGIC);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ce882703/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 280a9c2..0cfe4b0 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -29,6 +29,8 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.TimeUnit;
@@ -104,7 +106,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       "hbase.procedure.store.wal.sync.stats.count";
   private static final int DEFAULT_SYNC_STATS_COUNT = 10;
 
-  private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
+  private final LinkedList<ProcedureWALFile> logs = new LinkedList<>();
   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
@@ -246,7 +248,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     // Close the writer
-    closeStream();
+    closeCurrentLogStream();
 
     // Close the old logs
     // they should be already closed, this is just in case the load fails
@@ -776,6 +778,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
+  @VisibleForTesting void removeInactiveLogsForTesting() throws Exception {
+    lock.lock();
+    try {
+      removeInactiveLogs();
+    } finally  {
+      lock.unlock();
+    }
+  }
+
   private void periodicRoll() throws IOException {
     if (storeTracker.isEmpty()) {
       if (LOG.isTraceEnabled()) {
@@ -853,7 +864,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       return false;
     }
 
-    closeStream();
+    closeCurrentLogStream();
 
     storeTracker.resetUpdates();
     stream = newStream;
@@ -869,12 +880,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return true;
   }
 
-  private void closeStream() {
+  private void closeCurrentLogStream() {
     try {
       if (stream != null) {
         try {
           ProcedureWALFile log = logs.getLast();
           log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
+          log.updateLocalTracker(storeTracker);
           long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
           log.addToSize(trailerSize);
         } catch (IOException e) {
@@ -892,14 +904,38 @@ public class WALProcedureStore extends ProcedureStoreBase {
   // ==========================================================================
   //  Log Files cleaner helpers
   // ==========================================================================
-  private void removeInactiveLogs() {
-    // Verify if the ProcId of the first oldest is still active. if not remove the file.
-    while (logs.size() > 1) {
+
+  /**
+   * Iterates over log files from latest (ignoring currently active one) to oldest, deleting
the
+   * ones which don't contain anything useful for recovery.
+   * @throws IOException
+   */
+  private void removeInactiveLogs() throws IOException {
+    // TODO: can we somehow avoid first iteration (starting from newest log) and still figure
out
+    // efficient way to cleanup old logs.
+    // Alternatively, a complex and maybe more efficient method would be using this iteration
to
+    // rewrite latest states of active procedures to a new log file and delete all old ones.
+    if (logs.size() <= 1) return;
+    ProcedureStoreTracker runningTracker = new ProcedureStoreTracker();
+    runningTracker.resetTo(storeTracker);
+    List<ProcedureWALFile> logsToBeDeleted = new ArrayList<>();
+    for (int i = logs.size() - 2; i >= 0; i--) {
+      ProcedureWALFile log = logs.get(i);
+      // If nothing was subtracted, delete the log file since it doesn't contain any useful
proc
+      // states.
+      if (!runningTracker.subtract(log.getTracker())) {
+        logsToBeDeleted.add(log);
+      }
+    }
+    // Delete the logs from oldest to newest and stop at first log that can't be deleted
to avoid
+    // holes in the log file sequence (for better debug capability).
+    while (true) {
       ProcedureWALFile log = logs.getFirst();
-      if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
+      if (logsToBeDeleted.contains(log)) {
+        removeLogFile(log);
+      } else {
         break;
       }
-      removeLogFile(log);
     }
   }
 
@@ -1011,7 +1047,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
         final Path logPath = logFiles[i].getPath();
         leaseRecovery.recoverFileLease(fs, logPath);
         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
-
         ProcedureWALFile log = initOldLog(logFiles[i]);
         if (log != null) {
           this.logs.add(log);
@@ -1025,19 +1060,19 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   private void initTrackerFromOldLogs() {
     // TODO: Load the most recent tracker available
-    if (!logs.isEmpty()) {
-      ProcedureWALFile log = logs.getLast();
-      try {
-        log.readTracker(storeTracker);
-      } catch (IOException e) {
-        LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
-        // try the next one...
-        storeTracker.reset();
-        storeTracker.setPartialFlag(true);
-      }
+    if (logs.isEmpty()) return;
+    ProcedureWALFile log = logs.getLast();
+    if (log.getTracker() != null) {
+      storeTracker.resetTo(log.getTracker());
+    } else {
+      storeTracker.reset();
+      storeTracker.setPartialFlag(true);
     }
   }
 
+  /**
+   * Loads given log file and it's tracker.
+   */
   private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
     ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
     if (logFile.getLen() == 0) {
@@ -1069,6 +1104,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
         return null;
       }
     }
+    try {
+      log.readTracker();
+    } catch (IOException e) {
+      LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
+    }
     return log;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ce882703/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 55ece8a..76fd2c5 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -238,4 +239,112 @@ public class TestProcedureStoreTracker {
       }
     }
   }
+
+  boolean isDeleted(ProcedureStoreTracker n, long procId) {
+    return n.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
+  }
+
+  boolean isDeleted(BitSetNode n, long procId) {
+    return n.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
+  }
+
+  /**
+   * @param active list of active proc ids. To mark them as non-deleted, since by default
a proc
+   *               id is always marked deleted.
+   */
+  ProcedureStoreTracker buildTracker(long[] active, long[] updated, long[] deleted) {
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    for (long i : active) {
+      tracker.insert(i);
+    }
+    tracker.resetUpdates();
+    for (long i : updated) {
+      tracker.update(i);
+    }
+    for (long i : deleted) {
+      tracker.delete(i);
+    }
+    return tracker;
+  }
+
+  /**
+   * @param active list of active proc ids. To mark them as non-deleted, since by default
a proc
+   *               id is always marked deleted.
+   */
+  BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) {
+    BitSetNode bitSetNode = new BitSetNode(0L, false);
+    for (long i : active) {
+      bitSetNode.update(i);
+    }
+    bitSetNode.resetUpdates();
+    for (long i : updated) {
+      bitSetNode.update(i);
+    }
+    for (long i : deleted) {
+      bitSetNode.delete(i);
+    }
+    return bitSetNode;
+  }
+
+  @Test
+  public void testBitSetNodeSubtract() {
+    // 1 not updated in n2, nothing to subtract
+    BitSetNode n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ });
+    BitSetNode n2 = buildBitSetNode(new long[]{ 1L }, new long[]{}, new long[]{});
+    assertFalse(n1.subtract(n2));
+
+    // 1 updated in n2, and not deleted in n1, should subtract.
+    n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
+    n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
+    assertTrue(n1.subtract(n2));
+
+    // 1 updated in n2, but deleted in n1, should not subtract
+    n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L });
+    n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
+    assertFalse(n1.subtract(n2));
+
+    // 1 updated in n2, but not deleted in n1, should subtract.
+    n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{});
+    n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L });
+    assertTrue(n1.subtract(n2));
+
+    // all four cases together.
+    n1 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L  }, new long[]{ 0L, 10L, 20L, 30L
 },
+        new long[]{ 20L });
+    n2 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L  }, new long[]{ 0L, 20L, 30L },
+        new long[]{ 0L });
+    assertTrue(n1.subtract(n2));
+  }
+
+  @Test
+  // The structure is same as testBitSetNodeSubtract() but the ids are bigger so that internally
+  // there are many BitSetNodes.
+  public void testTrackerSubtract() {
+    // not updated in n2, nothing to subtract
+    ProcedureStoreTracker n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L
},
+        new long[]{ });
+    ProcedureStoreTracker n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{}, new long[]{});
+    assertFalse(n1.subtract(n2));
+
+    // updated in n2, and not deleted in n1, should subtract.
+    n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
+    n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
+    assertTrue(n1.subtract(n2));
+
+    // updated in n2, but also deleted in n1, should not subtract
+    n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L
});
+    n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{});
+    assertFalse(n1.subtract(n2));
+
+    // updated in n2, but not deleted in n1, should subtract.
+    n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{});
+    n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{ 1L, 1000L });
+    assertFalse(n1.subtract(n2));
+
+    n1 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 100L, 200L, 300L
},
+        new long[]{ 200L });
+    n2 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 200L, 300L },
+        new long[]{ 0L });
+    assertTrue(n1.subtract(n2));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ce882703/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index c02f846..e7f7c77 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -22,9 +22,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -105,6 +108,114 @@ public class TestWALProcedureStore {
     assertEquals(1, status.length);
   }
 
+  /**
+   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
+   */
+  @Test
+  public void trackersLoadedForAllOldLogs() throws Exception {
+    for (int i = 0; i <= 20; ++i) {
+      procStore.insert(new TestProcedure(i), null);
+      if (i > 0 && (i % 5) == 0) {
+        LoadCounter loader = new LoadCounter();
+        storeRestart(loader);
+      }
+    }
+    assertEquals(5, procStore.getActiveLogs().size());
+    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
+      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
+      assertTrue(tracker != null && !tracker.isEmpty());
+    }
+  }
+
+  @Test
+  public void testWalCleanerSequentialClean() throws Exception {
+    int NUM = 5;
+    List<Procedure> procs = new ArrayList<>();
+    ArrayList<ProcedureWALFile> logs = null;
+    // Insert procedures and roll wal after every insert.
+    for (int i = 0; i < NUM; i++) {
+      procs.add(new TestSequentialProcedure());
+      procStore.insert(procs.get(i), null);
+      procStore.rollWriterForTesting();
+      logs = procStore.getActiveLogs();
+      assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
+    }
+
+    // Delete procedures in sequential order make sure that only the corresponding wal is
deleted
+    // from logs list.
+    int[] deleteOrder = new int[]{ 0, 1, 2, 3, 4};
+    for (int i = 0; i < deleteOrder.length; i++) {
+      procStore.delete(procs.get(deleteOrder[i]).getProcId());
+      procStore.removeInactiveLogsForTesting();
+      assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
+      assertEquals(procStore.getActiveLogs().size(), NUM - i );
+    }
+  }
+
+
+  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files
if
+  // they are in the starting of the list.
+  @Test
+  public void testWalCleanerNoHoles() throws Exception {
+    int NUM = 5;
+    List<Procedure> procs = new ArrayList<>();
+    ArrayList<ProcedureWALFile> logs = null;
+    // Insert procedures and roll wal after every insert.
+    for (int i = 0; i < NUM; i++) {
+      procs.add(new TestSequentialProcedure());
+      procStore.insert(procs.get(i), null);
+      procStore.rollWriterForTesting();
+      logs = procStore.getActiveLogs();
+      assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
+    }
+
+    for (int i = 1; i < NUM; i++) {
+      procStore.delete(procs.get(i).getProcId());
+    }
+    assertEquals(procStore.getActiveLogs().size(), NUM + 1);
+    procStore.delete(procs.get(0).getProcId());
+    assertEquals(procStore.getActiveLogs().size(), 1);
+  }
+
+  @Test
+  public void testWalCleanerUpdates() throws Exception {
+    TestSequentialProcedure p1 = new TestSequentialProcedure(),
+        p2 = new TestSequentialProcedure();
+    procStore.insert(p1, null);
+    procStore.insert(p2, null);
+    procStore.rollWriterForTesting();
+    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
+    procStore.update(p1);
+    procStore.rollWriterForTesting();
+    procStore.update(p2);
+    procStore.rollWriterForTesting();
+    procStore.removeInactiveLogsForTesting();
+    assertFalse(procStore.getActiveLogs().contains(firstLog));
+  }
+
+  @Test
+  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
+    TestSequentialProcedure p1 = new TestSequentialProcedure(),
+        p2 = new TestSequentialProcedure();
+    procStore.insert(p1, null);
+    procStore.insert(p2, null);
+    procStore.rollWriterForTesting();  // generates first log with p1 + p2
+    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
+    procStore.update(p2);
+    procStore.rollWriterForTesting();  // generates second log with p2
+    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
+    procStore.update(p2);
+    procStore.rollWriterForTesting();  // generates third log with p2
+    procStore.removeInactiveLogsForTesting();  // Shouldn't remove 2nd log.
+    assertEquals(4, procStore.getActiveLogs().size());
+    procStore.update(p1);
+    procStore.rollWriterForTesting();  // generates fourth log with p1
+    procStore.removeInactiveLogsForTesting();  // Should remove first two logs.
+    assertEquals(3, procStore.getActiveLogs().size());
+    assertFalse(procStore.getActiveLogs().contains(log1));
+    assertFalse(procStore.getActiveLogs().contains(log2));
+  }
+
   @Test
   public void testEmptyLogLoad() throws Exception {
     LoadCounter loader = new LoadCounter();


Mime
View raw message