hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [2/4] hbase git commit: HBASE-14947 WALProcedureStore improvements
Date Tue, 15 Dec 2015 20:53:24 GMT
HBASE-14947 WALProcedureStore improvements


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8698da2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8698da2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8698da2

Branch: refs/heads/branch-1.2
Commit: e8698da2a4b6f7b419478485617ec22933f22f3c
Parents: ef19c21
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Tue Dec 15 10:15:18 2015 -0800
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Tue Dec 15 12:52:45 2015 -0800

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java |  21 +-
 .../procedure2/store/wal/WALProcedureStore.java | 294 ++++++++++---------
 .../hbase/procedure2/TestProcedureRecovery.java |   4 +-
 .../store/TestProcedureStoreTracker.java        |  35 +--
 .../store/wal/TestWALProcedureStore.java        |  51 +++-
 5 files changed, 203 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e8698da2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 8516f61..6823288 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -27,7 +27,6 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
 
 /**
@@ -356,25 +355,19 @@ public class ProcedureStoreTracker {
     }
   }
 
-  public void insert(final Procedure proc, final Procedure[] subprocs) {
-    insert(proc.getProcId());
-    if (subprocs != null) {
-      for (int i = 0; i < subprocs.length; ++i) {
-        insert(subprocs[i].getProcId());
-      }
-    }
-  }
-
-  public void update(final Procedure proc) {
-    update(proc.getProcId());
-  }
-
   public void insert(long procId) {
     BitSetNode node = getOrCreateNode(procId);
     node.update(procId);
     trackProcIds(procId);
   }
 
+  public void insert(final long procId, final long[] subProcIds) {
+    update(procId);
+    for (int i = 0; i < subProcIds.length; ++i) {
+      insert(subProcIds[i]);
+    }
+  }
+
   public void update(long procId) {
     Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
     assert entry != null : "expected node to update procId=" + procId;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8698da2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index d73cb4f..d871e37 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -100,7 +100,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
-  private final AtomicLong inactiveLogsMaxId = new AtomicLong(0);
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
   private final Condition slotCond = lock.newCondition();
@@ -191,19 +190,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     LOG.info("Stopping the WAL Procedure Store");
-    if (lock.tryLock()) {
-      try {
-        waitCond.signalAll();
-        syncCond.signalAll();
-      } finally {
-        lock.unlock();
-      }
-    }
+    sendStopSignal();
 
     if (!abort) {
       try {
-        syncThread.join();
+        while (syncThread.isAlive()) {
+          sendStopSignal();
+          syncThread.join(250);
+        }
       } catch (InterruptedException e) {
+        LOG.warn("join interrupted", e);
         Thread.currentThread().interrupt();
       }
     }
@@ -220,6 +216,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
     logs.clear();
   }
 
+  private void sendStopSignal() {
+    if (lock.tryLock()) {
+      try {
+        waitCond.signalAll();
+        syncCond.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
   @Override
   public int getNumThreads() {
     return slots == null ? 0 : slots.length;
@@ -239,31 +246,36 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   @Override
   public void recoverLease() throws IOException {
-    LOG.info("Starting WAL Procedure Store lease recovery");
-    FileStatus[] oldLogs = getLogFiles();
-    while (isRunning()) {
-      // Get Log-MaxID and recover lease on old logs
-      flushLogId = initOldLogs(oldLogs);
-
-      // Create new state-log
-      if (!rollWriter(flushLogId + 1)) {
-        // someone else has already created this log
-        LOG.debug("someone else has already created log " + flushLogId);
-        continue;
-      }
+    lock.lock();
+    try {
+      LOG.info("Starting WAL Procedure Store lease recovery");
+      FileStatus[] oldLogs = getLogFiles();
+      while (isRunning()) {
+        // Get Log-MaxID and recover lease on old logs
+        flushLogId = initOldLogs(oldLogs);
+
+        // Create new state-log
+        if (!rollWriter(flushLogId + 1)) {
+          // someone else has already created this log
+          LOG.debug("someone else has already created log " + flushLogId);
+          continue;
+        }
 
-      // We have the lease on the log
-      oldLogs = getLogFiles();
-      if (getMaxLogId(oldLogs) > flushLogId) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
+        // We have the lease on the log
+        oldLogs = getLogFiles();
+        if (getMaxLogId(oldLogs) > flushLogId) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
+          }
+          logs.getLast().removeFile();
+          continue;
         }
-        logs.getLast().removeFile();
-        continue;
-      }
 
-      LOG.info("Lease acquired for flushLogId: " + flushLogId);
-      break;
+        LOG.info("Lease acquired for flushLogId: " + flushLogId);
+        break;
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -335,18 +347,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     ByteSlot slot = acquireSlot();
-    long logId = -1;
     try {
       // Serialize the insert
+      long[] subProcIds = null;
       if (subprocs != null) {
         ProcedureWALFormat.writeInsert(slot, proc, subprocs);
+        subProcIds = new long[subprocs.length];
+        for (int i = 0; i < subprocs.length; ++i) {
+          subProcIds[i] = subprocs[i].getProcId();
+        }
       } else {
         assert !proc.hasParent();
         ProcedureWALFormat.writeInsert(slot, proc);
       }
 
       // Push the transaction data and wait until it is persisted
-      pushData(slot);
+      pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
     } catch (IOException e) {
       // We are not able to serialize the procedure.
       // this is a code error, and we are not able to go on.
@@ -356,14 +372,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
     } finally {
       releaseSlot(slot);
     }
-
-    // Update the store tracker
-    synchronized (storeTracker) {
-      storeTracker.insert(proc, subprocs);
-      if (logId == flushLogId) {
-        checkAndTryRoll();
-      }
-    }
   }
 
   @Override
@@ -373,13 +381,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     ByteSlot slot = acquireSlot();
-    long logId = -1;
     try {
       // Serialize the update
       ProcedureWALFormat.writeUpdate(slot, proc);
 
       // Push the transaction data and wait until it is persisted
-      logId = pushData(slot);
+      pushData(PushType.UPDATE, slot, proc.getProcId(), null);
     } catch (IOException e) {
       // We are not able to serialize the procedure.
       // this is a code error, and we are not able to go on.
@@ -388,20 +395,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
     } finally {
       releaseSlot(slot);
     }
-
-    // Update the store tracker
-    boolean removeOldLogs = false;
-    synchronized (storeTracker) {
-      storeTracker.update(proc);
-      if (logId == flushLogId) {
-        removeOldLogs = storeTracker.isUpdated();
-        checkAndTryRoll();
-      }
-    }
-
-    if (removeOldLogs) {
-      setInactiveLogsMaxId(logId - 1);
-    }
   }
 
   @Override
@@ -411,13 +404,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     ByteSlot slot = acquireSlot();
-    long logId = -1;
     try {
       // Serialize the delete
       ProcedureWALFormat.writeDelete(slot, procId);
 
       // Push the transaction data and wait until it is persisted
-      logId = pushData(slot);
+      pushData(PushType.DELETE, slot, procId, null);
     } catch (IOException e) {
       // We are not able to serialize the procedure.
       // this is a code error, and we are not able to go on.
@@ -426,22 +418,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
     } finally {
       releaseSlot(slot);
     }
-
-    boolean removeOldLogs = false;
-    synchronized (storeTracker) {
-      storeTracker.delete(procId);
-      if (logId == flushLogId) {
-        if (storeTracker.isEmpty() || storeTracker.isUpdated()) {
-          removeOldLogs = checkAndTryRoll();
-        } else {
-          checkAndTryRoll();
-        }
-      }
-    }
-
-    if (removeOldLogs) {
-      setInactiveLogsMaxId(logId);
-    }
   }
 
   private ByteSlot acquireSlot() {
@@ -454,7 +430,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
     slotsCache.offer(slot);
   }
 
-  private long pushData(final ByteSlot slot) {
+  private enum PushType { INSERT, UPDATE, DELETE };
+
+  private long pushData(final PushType type, final ByteSlot slot,
+      final long procId, final long[] subProcIds) {
     if (!isRunning()) {
       throw new RuntimeException("the store must be running before inserting data");
     }
@@ -481,6 +460,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
       }
 
+      updateStoreTracker(type, procId, subProcIds);
       slots[slotIndex++] = slot;
       logId = flushLogId;
 
@@ -509,20 +489,29 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return logId;
   }
 
-  private boolean isSyncAborted() {
-    return syncException.get() != null;
+  private void updateStoreTracker(final PushType type,
+      final long procId, final long[] subProcIds) {
+    switch (type) {
+      case INSERT:
+        if (subProcIds == null) {
+          storeTracker.insert(procId);
+        } else {
+          storeTracker.insert(procId, subProcIds);
+        }
+        break;
+      case UPDATE:
+        storeTracker.update(procId);
+        break;
+      case DELETE:
+        storeTracker.delete(procId);
+        break;
+      default:
+        throw new RuntimeException("invalid push type " + type);
+    }
   }
 
-  protected void periodicRoll() throws IOException {
-    long logId;
-    boolean removeOldLogs;
-    synchronized (storeTracker) {
-      logId = flushLogId;
-      removeOldLogs = storeTracker.isEmpty();
-    }
-    if (checkAndTryRoll() && removeOldLogs) {
-      setInactiveLogsMaxId(logId);
-    }
+  private boolean isSyncAborted() {
+    return syncException.get() != null;
   }
 
   private void syncLoop() throws Throwable {
@@ -534,7 +523,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           // Wait until new data is available
           if (slotIndex == 0) {
             if (!loading.get()) {
-              removeInactiveLogs();
+              periodicRoll();
             }
 
             if (LOG.isTraceEnabled()) {
@@ -547,7 +536,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
             waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
             if (slotIndex == 0) {
               // no data.. probably a stop() or a periodic roll
-              periodicRoll();
               continue;
             }
           }
@@ -560,13 +548,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
           long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
           if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length))
{
             float rollSec = getMillisFromLastRoll() / 1000.0f;
-            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec",
+            LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)",
                       StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
                       StringUtils.humanSize(totalSynced.get()),
                       StringUtils.humanSize(totalSynced.get() / rollSec)));
           }
 
-
           inSync.set(true);
           totalSynced.addAndGet(syncSlots());
           slotIndex = 0;
@@ -639,8 +626,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return totalSynced;
   }
 
-  @VisibleForTesting
-  public boolean rollWriterOrDie() {
+  private boolean rollWriterOrDie() {
     for (int i = 1; i <= rollRetries; ++i) {
       try {
         if (rollWriter()) {
@@ -656,17 +642,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
     throw new RuntimeException("unable to roll the log");
   }
 
-  protected boolean checkAndTryRoll() {
-    if (!isRunning()) return false;
-
-    if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
-      try {
-        return rollWriter();
-      } catch (IOException e) {
-        LOG.warn("Unable to roll the log", e);
-      }
+  private boolean tryRollWriter() {
+    try {
+      return rollWriter();
+    } catch (IOException e) {
+      LOG.warn("Unable to roll the log", e);
+      return false;
     }
-    return false;
   }
 
   private long getMillisToNextPeriodicRoll() {
@@ -680,7 +662,52 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return (System.currentTimeMillis() - lastRollTs.get());
   }
 
-  protected boolean rollWriter() throws IOException {
+  @VisibleForTesting
+  protected void periodicRollForTesting() throws IOException {
+    lock.lock();
+    try {
+      periodicRoll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  protected boolean rollWriterForTesting() throws IOException {
+    lock.lock();
+    try {
+      return rollWriter();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void periodicRoll() throws IOException {
+    if (storeTracker.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("no active procedures");
+      }
+      tryRollWriter();
+      removeAllLogs(flushLogId - 1);
+    } else {
+      if (storeTracker.isUpdated()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("all the active procedures are in the latest log");
+        }
+        removeAllLogs(flushLogId - 1);
+      }
+
+      // if the log size has exceeded the roll threshold
+      // or the periodic roll timeout is expired, try to roll the wal.
+      if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0)
{
+        tryRollWriter();
+      }
+
+      removeInactiveLogs();
+    }
+  }
+
+  private boolean rollWriter() throws IOException {
     // Create new state-log
     if (!rollWriter(flushLogId + 1)) {
       LOG.warn("someone else has already created log " + flushLogId);
@@ -701,6 +728,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   private boolean rollWriter(final long logId) throws IOException {
     assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
+    assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked();
 
     ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
       .setVersion(ProcedureWALFormat.HEADER_VERSION)
@@ -730,20 +758,16 @@ public class WALProcedureStore extends ProcedureStoreBase {
       newStream.close();
       return false;
     }
-    lock.lock();
-    try {
-      closeStream();
-      synchronized (storeTracker) {
-        storeTracker.resetUpdates();
-      }
-      stream = newStream;
-      flushLogId = logId;
-      totalSynced.set(0);
-      lastRollTs.set(System.currentTimeMillis());
-      logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
-    } finally {
-      lock.unlock();
-    }
+
+    closeStream();
+
+    storeTracker.resetUpdates();
+    stream = newStream;
+    flushLogId = logId;
+    totalSynced.set(0);
+    lastRollTs.set(System.currentTimeMillis());
+    logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Roll new state log: " + logId);
     }
@@ -754,11 +778,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
     try {
       if (stream != null) {
         try {
-          synchronized (storeTracker) {
-            ProcedureWALFile log = logs.getLast();
-            log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
-            ProcedureWALFormat.writeTrailer(stream, storeTracker);
-          }
+          ProcedureWALFile log = logs.getLast();
+          log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId());
+          ProcedureWALFormat.writeTrailer(stream, storeTracker);
         } catch (IOException e) {
           LOG.warn("Unable to write the trailer: " + e.getMessage());
         }
@@ -774,30 +796,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
   // ==========================================================================
   //  Log Files cleaner helpers
   // ==========================================================================
-  private void setInactiveLogsMaxId(long logId) {
-    long expect = 0;
-    while (!inactiveLogsMaxId.compareAndSet(expect, logId)) {
-      expect = inactiveLogsMaxId.get();
-      if (expect >= logId) {
-        break;
-      }
-    }
-  }
-
   private void removeInactiveLogs() {
-    long lastLogId = inactiveLogsMaxId.get();
-    if (lastLogId != 0) {
-      removeAllLogs(lastLogId);
-      inactiveLogsMaxId.compareAndSet(lastLogId, 0);
-    }
-
     // Verify if the ProcId of the first oldest is still active. if not remove the file.
     while (logs.size() > 1) {
       ProcedureWALFile log = logs.getFirst();
-      synchronized (storeTracker) {
-        if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
-          break;
-        }
+      if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
+        break;
       }
       removeLogFile(log);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8698da2/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 74e4675..ac57c15 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -314,7 +314,7 @@ public class TestProcedureRecovery {
   public void testRunningProcWithSameNonce() throws Exception {
     final long nonceGroup = 456;
     final long nonce = 33333;
-    Procedure proc = new TestMultiStepProcedure();
+    Procedure proc = new TestSingleStepProcedure();
     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
 
     // Restart (use a latch to prevent the step execution until we submitted proc2)
@@ -322,7 +322,7 @@ public class TestProcedureRecovery {
     procEnv.setWaitLatch(latch);
     restart();
     // Submit a procedure with the same nonce and expect the same procedure would return.
-    Procedure proc2 = new TestMultiStepProcedure();
+    Procedure proc2 = new TestSingleStepProcedure();
     long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
     latch.countDown();
     procEnv.setWaitLatch(null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8698da2/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 0dc9d92..26a94d4 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 
 import org.junit.Assert;
@@ -41,27 +40,6 @@ import static org.junit.Assert.fail;
 public class TestProcedureStoreTracker {
   private static final Log LOG = LogFactory.getLog(TestProcedureStoreTracker.class);
 
-  static class TestProcedure extends Procedure<Void> {
-    public TestProcedure(long procId) {
-      setProcId(procId);
-    }
-
-    @Override
-    protected Procedure[] execute(Void env) { return null; }
-
-    @Override
-    protected void rollback(Void env) { /* no-op */ }
-
-    @Override
-    protected boolean abort(Void env) { return false; }
-
-    @Override
-    protected void serializeStateData(final OutputStream stream) { /* no-op */ }
-
-    @Override
-    protected void deserializeStateData(final InputStream stream) { /* no-op */ }
-  }
-
   @Test
   public void testSeqInsertAndDelete() {
     ProcedureStoreTracker tracker = new ProcedureStoreTracker();
@@ -161,13 +139,10 @@ public class TestProcedureStoreTracker {
     ProcedureStoreTracker tracker = new ProcedureStoreTracker();
     assertTrue(tracker.isEmpty());
 
-    Procedure[] procs = new TestProcedure[] {
-      new TestProcedure(1), new TestProcedure(2), new TestProcedure(3),
-      new TestProcedure(4), new TestProcedure(5), new TestProcedure(6),
-    };
+    long[] procs = new long[] { 1, 2, 3, 4, 5, 6 };
 
-    tracker.insert(procs[0], null);
-    tracker.insert(procs[1], new Procedure[] { procs[2], procs[3], procs[4] });
+    tracker.insert(procs[0]);
+    tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] });
     assertFalse(tracker.isEmpty());
     assertTrue(tracker.isUpdated());
 
@@ -189,11 +164,11 @@ public class TestProcedureStoreTracker {
     assertTrue(tracker.isUpdated());
 
     for (int i = 0; i < 5; ++i) {
-      tracker.delete(procs[i].getProcId());
+      tracker.delete(procs[i]);
       assertFalse(tracker.isEmpty());
       assertTrue(tracker.isUpdated());
     }
-    tracker.delete(procs[5].getProcId());
+    tracker.delete(procs[5]);
     assertTrue(tracker.isEmpty());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8698da2/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 1265f3f..18ee05b 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -103,7 +103,7 @@ public class TestWALProcedureStore {
   @Test
   public void testEmptyRoll() throws Exception {
     for (int i = 0; i < 10; ++i) {
-      procStore.periodicRoll();
+      procStore.periodicRollForTesting();
     }
     FileStatus[] status = fs.listStatus(logDir);
     assertEquals(1, status.length);
@@ -215,14 +215,14 @@ public class TestWALProcedureStore {
       procStore.update(rootProcs[i-1]);
     }
     // insert root-child txn
-    procStore.rollWriter();
+    procStore.rollWriterForTesting();
     for (int i = 1; i <= rootProcs.length; i++) {
       TestProcedure b = new TestProcedure(rootProcs.length + i, i);
       rootProcs[i-1].addStackId(1);
       procStore.insert(rootProcs[i-1], new Procedure[] { b });
     }
     // insert child updates
-    procStore.rollWriter();
+    procStore.rollWriterForTesting();
     for (int i = 1; i <= rootProcs.length; i++) {
       procStore.update(new TestProcedure(rootProcs.length + i, i));
     }
@@ -230,9 +230,10 @@ public class TestWALProcedureStore {
     // Stop the store
     procStore.stop(false);
 
-    // Remove 4 byte from the trailer
+    // the first log was removed,
+    // we have insert-txn and updates in the others so everything is fine
     FileStatus[] logs = fs.listStatus(logDir);
-    assertEquals(3, logs.length);
+    assertEquals(Arrays.toString(logs), 2, logs.length);
     Arrays.sort(logs, new Comparator<FileStatus>() {
       @Override
       public int compare(FileStatus o1, FileStatus o2) {
@@ -240,15 +241,13 @@ public class TestWALProcedureStore {
       }
     });
 
-    // Remove the first log, we have insert-txn and updates in the others so everything is
fine.
-    fs.delete(logs[0].getPath(), false);
     LoadCounter loader = new LoadCounter();
     storeRestart(loader);
     assertEquals(rootProcs.length * 2, loader.getLoadedCount());
     assertEquals(0, loader.getCorruptedCount());
 
-    // Remove the second log, we have lost any root/parent references
-    fs.delete(logs[1].getPath(), false);
+    // Remove the second log, we have lost all the root/parent references
+    fs.delete(logs[0].getPath(), false);
     loader.reset();
     storeRestart(loader);
     assertEquals(0, loader.getLoadedCount());
@@ -277,7 +276,7 @@ public class TestWALProcedureStore {
     b.addStackId(1);
     procStore.update(b);
 
-    procStore.rollWriter();
+    procStore.rollWriterForTesting();
 
     a.addStackId(2);
     procStore.update(a);
@@ -326,7 +325,7 @@ public class TestWALProcedureStore {
     b.addStackId(2);
     procStore.update(b);
 
-    procStore.rollWriter();
+    procStore.rollWriterForTesting();
 
     b.addStackId(3);
     procStore.update(b);
@@ -427,6 +426,36 @@ public class TestWALProcedureStore {
     assertEquals(1, procStore.getActiveLogs().size());
   }
 
+  @Test
+  public void testRollAndRemove() throws IOException {
+    // Insert something in the log
+    Procedure proc1 = new TestSequentialProcedure();
+    procStore.insert(proc1, null);
+
+    Procedure proc2 = new TestSequentialProcedure();
+    procStore.insert(proc2, null);
+
+    // roll the log, now we have 2
+    procStore.rollWriterForTesting();
+    assertEquals(2, procStore.getActiveLogs().size());
+
+    // everything will be up to date in the second log
+    // so we can remove the first one
+    procStore.update(proc1);
+    procStore.update(proc2);
+    assertEquals(1, procStore.getActiveLogs().size());
+
+    // roll the log, now we have 2
+    procStore.rollWriterForTesting();
+    assertEquals(2, procStore.getActiveLogs().size());
+
+    // remove everything active
+    // so we can remove all the logs
+    procStore.delete(proc1.getProcId());
+    procStore.delete(proc2.getProcId());
+    assertEquals(1, procStore.getActiveLogs().size());
+  }
+
   private void corruptLog(final FileStatus logFile, final long dropBytes)
       throws IOException {
     assertTrue(logFile.getLen() > dropBytes);


Mime
View raw message