Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7C781867D for ; Mon, 9 Nov 2015 19:02:48 +0000 (UTC) Received: (qmail 1171 invoked by uid 500); 9 Nov 2015 19:02:48 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 925 invoked by uid 500); 9 Nov 2015 19:02:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 868 invoked by uid 99); 9 Nov 2015 19:02:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 19:02:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0CD1FE08B3; Mon, 9 Nov 2015 19:02:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbertozzi@apache.org To: commits@hbase.apache.org Date: Mon, 09 Nov 2015 19:02:50 -0000 Message-Id: <11f427ac09fd47c3b6c6fe62133cc882@git.apache.org> In-Reply-To: <9c06fc29482d4fc1b3c63d7630ad4b12@git.apache.org> References: <9c06fc29482d4fc1b3c63d7630ad4b12@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hbase git commit: HBASE-14712 Increase MasterProcWALs clean up granularity HBASE-14712 Increase MasterProcWALs clean up granularity Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1befa468 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1befa468 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1befa468 Branch: refs/heads/branch-1.2 Commit: 1befa4683dea7a21f8879c4c240fea076f501329 Parents: 520c02b Author: Matteo Bertozzi Authored: Mon Nov 9 09:33:05 2015 -0800 Committer: Matteo Bertozzi Committed: Mon Nov 9 09:52:47 2015 -0800 ---------------------------------------------------------------------- .../procedure2/store/ProcedureStoreTracker.java | 56 +++++++++++-- .../procedure2/store/wal/ProcedureWALFile.java | 15 ++++ .../store/wal/ProcedureWALFormatReader.java | 23 +++++- .../procedure2/store/wal/WALProcedureStore.java | 87 ++++++++++++++++---- .../store/TestProcedureStoreTracker.java | 31 +++++++ .../store/wal/TestWALProcedureStore.java | 65 +++++++++++++++ 6 files changed, 255 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1befa468/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 07fb026..8516f61 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -44,13 +44,16 @@ public class ProcedureStoreTracker { private boolean keepDeletes = false; private boolean partial = false; + private long minUpdatedProcId = Long.MAX_VALUE; + private long maxUpdatedProcId = Long.MIN_VALUE; + public enum DeleteState { YES, NO, MAYBE } public static class BitSetNode { private final static long WORD_MASK = 0xffffffffffffffffL; private final static int ADDRESS_BITS_PER_WORD = 6; private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; - private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD; + private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; private final boolean partial; private long[] updated; @@ -81,7 +84,7 @@ public class ProcedureStoreTracker { public BitSetNode(final long procId, final boolean partial) { start = alignDown(procId); - int count = 2; + int count = 1; updated = new long[count]; deleted = new long[count]; for (int i = 0; i < count; ++i) { @@ -141,8 +144,7 @@ public class ProcedureStoreTracker { public boolean isUpdated() { // TODO: cache the value for (int i = 0; i < updated.length; ++i) { - long deleteMask = ~deleted[i]; - if ((updated[i] & deleteMask) != (WORD_MASK & deleteMask)) { + if ((updated[i] | deleted[i]) != WORD_MASK) { return false; } } @@ -171,6 +173,16 @@ public class ProcedureStoreTracker { } } + public void unsetPartialFlag() { + for (int i = 0; i < updated.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((updated[i] & (1L << j)) == 0) { + deleted[i] |= (1L << j); + } + } + } + } + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); @@ -360,6 +372,7 @@ public class ProcedureStoreTracker { public void insert(long procId) { BitSetNode node = getOrCreateNode(procId); node.update(procId); + trackProcIds(procId); } public void update(long procId) { @@ -369,6 +382,7 @@ public class ProcedureStoreTracker { BitSetNode node = entry.getValue(); assert node.contains(procId); node.update(procId); + trackProcIds(procId); } public void delete(long procId) { @@ -383,6 +397,21 @@ public class ProcedureStoreTracker { // TODO: RESET if (map.size() == 1) map.remove(entry.getKey()); } + + trackProcIds(procId); + } + + private void trackProcIds(long procId) { + minUpdatedProcId = Math.min(minUpdatedProcId, procId); + maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); + } + + public long getUpdatedMinProcId() { + return minUpdatedProcId; + } + + public long getUpdatedMaxProcId() { + return maxUpdatedProcId; } @InterfaceAudience.Private @@ -394,11 +423,12 @@ public class ProcedureStoreTracker { public void clear() { this.map.clear(); + resetUpdates(); } public DeleteState isDeleted(long procId) { Map.Entry entry = map.floorEntry(procId); - if (entry != null) { + if (entry != null && entry.getValue().contains(procId)) { BitSetNode node = entry.getValue(); DeleteState state = node.isDeleted(procId); return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; @@ -426,6 +456,11 @@ public class ProcedureStoreTracker { } public void setPartialFlag(boolean isPartial) { + if (this.partial && !isPartial) { + for (Map.Entry entry : map.entrySet()) { + entry.getValue().unsetPartialFlag(); + } + } this.partial = isPartial; } @@ -447,10 +482,17 @@ public class ProcedureStoreTracker { return true; } + public boolean isTracking(long minId, long maxId) { + // TODO: we can make it more precise, instead of looking just at the block + return map.floorEntry(minId) != null || map.floorEntry(maxId) != null; + } + public void resetUpdates() { for (Map.Entry entry : map.entrySet()) { entry.getValue().resetUpdates(); } + minUpdatedProcId = Long.MAX_VALUE; + maxUpdatedProcId = Long.MIN_VALUE; } public void undeleteAll() { @@ -527,6 +569,8 @@ public class ProcedureStoreTracker { public void dump() { System.out.println("map " + map.size()); + System.out.println("isUpdated " + isUpdated()); + System.out.println("isEmpty " + isEmpty()); for (Map.Entry entry : map.entrySet()) { entry.getValue().dump(); } @@ -550,4 +594,4 @@ public class ProcedureStoreTracker { map.put(node.getStart(), node); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1befa468/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 859b3cb..4f8a493 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -46,6 +46,8 @@ public class ProcedureWALFile implements Comparable { private FileSystem fs; private Path logFile; private long startPos; + private long minProcId; + private long maxProcId; public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) { this.fs = fs; @@ -127,6 +129,19 @@ public class ProcedureWALFile implements Comparable { fs.delete(logFile, false); } + public void setProcIds(long minId, long maxId) { + this.minProcId = minId; + this.maxProcId = maxId; + } + + public long getMinProcId() { + return minProcId; + } + + public long getMaxProcId() { + return maxProcId; + } + @Override public int compareTo(final ProcedureWALFile other) { long diff = header.getLogId() - other.header.getLogId(); http://git-wip-us.apache.org/repos/asf/hbase/blob/1befa468/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 76c0554..fa4fccf 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -150,8 +150,8 @@ public class ProcedureWALFormatReader { LOG.info("No active entry found in state log " + log + ". removing it"); loader.removeLog(log); } else { + log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); procedureMap.mergeTail(localProcedureMap); - //if (hasFastStartSupport) { // TODO: Some procedure may be already runnables (see readInitEntry()) // (we can also check the "update map" in the log trackers) @@ -321,6 +321,10 @@ public class ProcedureWALFormatReader { // pending unlinked children (root not present yet) private Entry childUnlinkedHead; + // Track ProcId range + private long minProcId = Long.MAX_VALUE; + private long maxProcId = Long.MIN_VALUE; + public WalProcedureMap(int size) { procedureMap = new Entry[size]; replayOrderHead = null; @@ -330,6 +334,7 @@ public class ProcedureWALFormatReader { } public void add(ProcedureProtos.Procedure procProto) { + trackProcIds(procProto.getProcId()); Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); boolean isNew = entry.proto == null; entry.proto = procProto; @@ -345,6 +350,7 @@ public class ProcedureWALFormatReader { } public boolean remove(long procId) { + trackProcIds(procId); Entry entry = removeFromMap(procId); if (entry != null) { unlinkFromReplayList(entry); @@ -354,6 +360,19 @@ public class ProcedureWALFormatReader { return false; } + private void trackProcIds(long procId) { + minProcId = Math.min(minProcId, procId); + maxProcId = Math.max(maxProcId, procId); + } + + public long getMinProcId() { + return minProcId; + } + + public long getMaxProcId() { + return maxProcId; + } + public boolean contains(long procId) { return getProcedure(procId) != null; } @@ -370,6 +389,8 @@ public class ProcedureWALFormatReader { replayOrderTail = null; rootHead = null; childUnlinkedHead = null; + minProcId = Long.MAX_VALUE; + maxProcId = Long.MIN_VALUE; } /* http://git-wip-us.apache.org/repos/asf/hbase/blob/1befa468/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 8764ff0..a3115f8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -100,6 +100,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private final LinkedList logs = new LinkedList(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); + private final AtomicLong inactiveLogsMaxId = new AtomicLong(0); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); private final Condition slotCond = lock.newCondition(); @@ -225,6 +226,14 @@ public class WALProcedureStore extends ProcedureStoreBase { return storeTracker; } + public LinkedList getActiveLogs() { + return logs; + } + + public Set getCorruptedLogs() { + return corruptedLogs; + } + @Override public void recoverLease() throws IOException { LOG.info("Starting WAL Procedure Store lease recovery"); @@ -386,7 +395,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } if (removeOldLogs) { - removeAllLogs(logId - 1); + setInactiveLogsMaxId(logId - 1); } } @@ -426,7 +435,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } if (removeOldLogs) { - removeAllLogs(logId); + setInactiveLogsMaxId(logId); } } @@ -499,6 +508,18 @@ public class WALProcedureStore extends ProcedureStoreBase { return syncException.get() != null; } + protected void periodicRoll() throws IOException { + long logId; + boolean removeOldLogs; + synchronized (storeTracker) { + logId = flushLogId; + removeOldLogs = storeTracker.isEmpty(); + } + if (checkAndTryRoll() && removeOldLogs) { + setInactiveLogsMaxId(logId); + } + } + private void syncLoop() throws Throwable { inSync.set(false); lock.lock(); @@ -507,6 +528,8 @@ public class WALProcedureStore extends ProcedureStoreBase { try { // Wait until new data is available if (slotIndex == 0) { + removeInactiveLogs(); + if (LOG.isTraceEnabled()) { float rollTsSec = getMillisFromLastRoll() / 1000.0f; LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", @@ -516,8 +539,8 @@ public class WALProcedureStore extends ProcedureStoreBase { waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); if (slotIndex == 0) { - // no data.. probably a stop() - checkAndTryRoll(); + // no data.. probably a stop() or a periodic roll + periodicRoll(); continue; } } @@ -724,7 +747,11 @@ public class WALProcedureStore extends ProcedureStoreBase { try { if (stream != null) { try { - ProcedureWALFormat.writeTrailer(stream, storeTracker); + synchronized (storeTracker) { + ProcedureWALFile log = logs.getLast(); + log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + ProcedureWALFormat.writeTrailer(stream, storeTracker); + } } catch (IOException e) { LOG.warn("Unable to write the trailer: " + e.getMessage()); } @@ -737,21 +764,51 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private void removeAllLogs(long lastLogId) { - if (logs.size() <= 1) { - assert logs.size() == 1: "Expected at least one active log to be running."; - return; + // ========================================================================== + // Log Files cleaner helpers + // ========================================================================== + private void setInactiveLogsMaxId(long logId) { + long expect = 0; + while (!inactiveLogsMaxId.compareAndSet(expect, logId)) { + expect = inactiveLogsMaxId.get(); + if (expect >= logId) { + break; + } } + } + + private void removeInactiveLogs() { + long lastLogId = inactiveLogsMaxId.get(); + if (lastLogId != 0) { + removeAllLogs(lastLogId); + inactiveLogsMaxId.compareAndSet(lastLogId, 0); + } + + // Verify if the ProcId of the first oldest is still active. if not remove the file. + while (logs.size() > 1) { + ProcedureWALFile log = logs.getFirst(); + synchronized (storeTracker) { + if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { + break; + } + } + removeLogFile(log); + } + } + + private void removeAllLogs(long lastLogId) { + if (logs.size() <= 1) return; + if (LOG.isDebugEnabled()) { LOG.debug("Remove all state logs with ID less than " + lastLogId); } - do { + while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); if (lastLogId < log.getLogId()) { break; } removeLogFile(log); - } while(!logs.isEmpty()); + } } private boolean removeLogFile(final ProcedureWALFile log) { @@ -761,6 +818,10 @@ public class WALProcedureStore extends ProcedureStoreBase { } log.removeFile(); logs.remove(log); + LOG.info("Remove log: " + log); + LOG.info("Removed logs: " + logs); + if (logs.size() == 0) { LOG.error("Expected at least one log"); } + assert logs.size() > 0 : "expected at least one log"; } catch (IOException e) { LOG.error("Unable to remove log: " + log, e); return false; @@ -768,10 +829,6 @@ public class WALProcedureStore extends ProcedureStoreBase { return true; } - public Set getCorruptedLogs() { - return corruptedLogs; - } - // ========================================================================== // FileSystem Log Files helpers // ========================================================================== http://git-wip-us.apache.org/repos/asf/hbase/blob/1befa468/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 424d9ce..0dc9d92 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2.store; import java.io.InputStream; import java.io.OutputStream; import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -123,6 +124,36 @@ public class TestProcedureStoreTracker { assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579)); assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(577)); assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, tracker.isDeleted(580)); + + tracker.setDeleted(579, true); + tracker.setPartialFlag(false); + assertTrue(tracker.isEmpty()); + } + + @Test + public void testIsTracking() { + long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}}; + long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}}; + + ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + for (int i = 0; i < procIds.length; ++i) { + long[] seq = procIds[i]; + tracker.insert(seq[0]); + tracker.insert(seq[1]); + } + + for (int i = 0; i < procIds.length; ++i) { + long[] check = checkIds[i]; + long[] seq = procIds[i]; + assertTrue(tracker.isTracking(seq[0], seq[1])); + assertTrue(tracker.isTracking(check[0], check[1])); + tracker.delete(seq[0]); + tracker.delete(seq[1]); + assertFalse(tracker.isTracking(seq[0], seq[1])); + assertFalse(tracker.isTracking(check[0], check[1])); + } + + assertTrue(tracker.isEmpty()); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/1befa468/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index c2e6a77..d98eddd 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -29,6 +29,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.HashSet; import java.util.Set; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -98,6 +100,15 @@ public class TestWALProcedureStore { } @Test + public void testEmptyRoll() throws Exception { + for (int i = 0; i < 10; ++i) { + procStore.periodicRoll(); + } + FileStatus[] status = fs.listStatus(logDir); + assertEquals(1, status.length); + } + + @Test public void testEmptyLogLoad() throws Exception { LoadCounter loader = new LoadCounter(); storeRestart(loader); @@ -355,6 +366,60 @@ public class TestWALProcedureStore { }); } + @Test + public void testInsertUpdateDelete() throws Exception { + final int NTHREAD = 2; + + procStore.stop(false); + fs.delete(logDir, true); + + org.apache.hadoop.conf.Configuration conf = + new org.apache.hadoop.conf.Configuration(htu.getConfiguration()); + conf.setBoolean("hbase.procedure.store.wal.use.hsync", false); + conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000); + conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024); + + fs.mkdirs(logDir); + procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + procStore.start(NTHREAD); + procStore.recoverLease(); + + final long LAST_PROC_ID = 9999; + final Thread[] thread = new Thread[NTHREAD]; + final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100)); + for (int i = 0; i < thread.length; ++i) { + thread[i] = new Thread() { + @Override + public void run() { + Random rand = new Random(); + TestProcedure proc; + do { + proc = new TestProcedure(procCounter.addAndGet(1)); + // Insert + procStore.insert(proc, null); + // Update + for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) { + try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {} + procStore.update(proc); + } + // Delete + procStore.delete(proc.getProcId()); + } while (proc.getProcId() < LAST_PROC_ID); + } + }; + thread[i].start(); + } + + for (int i = 0; i < thread.length; ++i) { + thread[i].join(); + } + + procStore.getStoreTracker().dump(); + assertTrue(procCounter.get() >= LAST_PROC_ID); + assertTrue(procStore.getStoreTracker().isEmpty()); + assertEquals(1, procStore.getActiveLogs().size()); + } + private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { assertTrue(logFile.getLen() > dropBytes);