Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 66798187F3 for ; Sat, 23 Jan 2016 00:33:37 +0000 (UTC) Received: (qmail 90694 invoked by uid 500); 23 Jan 2016 00:33:35 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 90617 invoked by uid 500); 23 Jan 2016 00:33:35 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 90554 invoked by uid 99); 23 Jan 2016 00:33:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jan 2016 00:33:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0E25DE01D9; Sat, 23 Jan 2016 00:33:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbertozzi@apache.org To: commits@hbase.apache.org Date: Sat, 23 Jan 2016 00:33:37 -0000 Message-Id: In-Reply-To: <10a4e8958e254e309d47371dba25121e@git.apache.org> References: <10a4e8958e254e309d47371dba25121e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hbase git commit: HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8a34c141 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8a34c141 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8a34c141 Branch: refs/heads/branch-1.2 Commit: 8a34c14129758adcea62299dfaaeb193d0279758 Parents: cd92094 Author: Matteo Bertozzi Authored: Fri Jan 22 15:57:12 2016 -0800 Committer: Matteo Bertozzi Committed: Fri Jan 22 16:23:43 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ProcedureInfo.java | 18 +++-- .../hadoop/hbase/procedure2/Procedure.java | 17 +---- .../hbase/procedure2/ProcedureExecutor.java | 77 ++++++++++++-------- .../hbase/procedure2/store/ProcedureStore.java | 18 ++++- .../procedure2/store/ProcedureStoreTracker.java | 10 ++- .../store/wal/ProcedureWALFormat.java | 1 - .../store/wal/ProcedureWALFormatReader.java | 50 +++++++++++-- .../procedure2/store/wal/WALProcedureStore.java | 32 ++++---- .../store/TestProcedureStoreTracker.java | 4 +- .../store/wal/TestWALProcedureStore.java | 66 +++++++++++++++-- 10 files changed, 201 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java index 11264cd..4768d98 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java @@ -41,12 +41,12 @@ public class ProcedureInfo implements Cloneable { private final String procOwner; private final ProcedureState procState; private final long parentId; + private final NonceKey nonceKey; private final ForeignExceptionMessage exception; private final long lastUpdate; private final long startTime; private final byte[] result; - private NonceKey nonceKey = null; private long clientAckTime = -1; public ProcedureInfo( @@ -55,6 +55,7 @@ public class ProcedureInfo implements Cloneable { final String procOwner, final ProcedureState procState, final long parentId, + final NonceKey nonceKey, final ForeignExceptionMessage exception, final long lastUpdate, final long startTime, @@ -64,6 +65,7 @@ public class ProcedureInfo implements Cloneable { this.procOwner = procOwner; this.procState = procState; this.parentId = parentId; + this.nonceKey = nonceKey; this.lastUpdate = lastUpdate; this.startTime = startTime; @@ -75,8 +77,8 @@ public class ProcedureInfo implements Cloneable { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL", justification="Intentional; calling super class clone doesn't make sense here.") public ProcedureInfo clone() { - return new ProcedureInfo( - procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime, result); + return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey, + exception, lastUpdate, startTime, result); } public long getProcId() { @@ -107,10 +109,6 @@ public class ProcedureInfo implements Cloneable { return nonceKey; } - public void setNonceKey(NonceKey nonceKey) { - this.nonceKey = nonceKey; - } - public boolean isFailed() { return exception != null; } @@ -218,12 +216,18 @@ public class ProcedureInfo implements Cloneable { */ @InterfaceAudience.Private public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) { + NonceKey nonceKey = null; + if (procProto.getNonce() != HConstants.NO_NONCE) { + nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce()); + } + return new ProcedureInfo( procProto.getProcId(), procProto.getClassName(), procProto.getOwner(), procProto.getState(), procProto.hasParentId() ? procProto.getParentId() : -1, + nonceKey, procProto.hasException() ? procProto.getException() : null, procProto.getLastUpdate(), procProto.getStartTime(), http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 8b343d5..64f817a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -640,30 +640,19 @@ public abstract class Procedure implements Comparable { */ @InterfaceAudience.Private public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) { - RemoteProcedureException exception; - - if (proc.hasException()) { - exception = proc.getException(); - } else { - exception = null; - } - ProcedureInfo procInfo = new ProcedureInfo( + RemoteProcedureException exception = proc.hasException() ? proc.getException() : null; + return new ProcedureInfo( proc.getProcId(), proc.toStringClass(), proc.getOwner(), proc.getState(), proc.hasParent() ? proc.getParentProcId() : -1, + nonceKey, exception != null ? RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null, proc.getLastUpdate(), proc.getStartTime(), proc.getResult()); - - if (nonceKey != null) { - procInfo.setNonceKey(nonceKey); - } - - return procInfo; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 95990e8..2d51744 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -308,7 +308,7 @@ public class ProcedureExecutor { public void handleCorrupted(ProcedureIterator procIter) throws IOException { int corruptedCount = 0; while (procIter.hasNext()) { - Procedure proc = procIter.next(); + ProcedureInfo proc = procIter.nextAsProcedureInfo(); LOG.error("corrupted procedure: " + proc); corruptedCount++; } @@ -321,24 +321,44 @@ public class ProcedureExecutor { private void loadProcedures(final ProcedureIterator procIter, final boolean abortOnCorruption) throws IOException { + final boolean isDebugEnabled = LOG.isDebugEnabled(); + // 1. Build the rollback stack int runnablesCount = 0; while (procIter.hasNext()) { - Procedure proc = procIter.next(); - if (!proc.hasParent() && !proc.isFinished()) { - rollbackStack.put(proc.getProcId(), new RootProcedureState()); - } - // add the procedure to the map - proc.beforeReplay(getEnvironment()); - procedures.put(proc.getProcId(), proc); + final NonceKey nonceKey; + final long procId; + + if (procIter.isNextCompleted()) { + ProcedureInfo proc = procIter.nextAsProcedureInfo(); + nonceKey = proc.getNonceKey(); + procId = proc.getProcId(); + completed.put(proc.getProcId(), proc); + if (isDebugEnabled) { + LOG.debug("The procedure is completed: " + proc); + } + } else { + Procedure proc = procIter.nextAsProcedure(); + nonceKey = proc.getNonceKey(); + procId = proc.getProcId(); - // add the nonce to the map - if (proc.getNonceKey() != null) { - nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId()); + if (!proc.hasParent()) { + assert !proc.isFinished() : "unexpected finished procedure"; + rollbackStack.put(proc.getProcId(), new RootProcedureState()); + } + + // add the procedure to the map + proc.beforeReplay(getEnvironment()); + procedures.put(proc.getProcId(), proc); + + if (proc.getState() == ProcedureState.RUNNABLE) { + runnablesCount++; + } } - if (proc.getState() == ProcedureState.RUNNABLE) { - runnablesCount++; + // add the nonce to the map + if (nonceKey != null) { + nonceKeysToProcIdsMap.put(nonceKey, procId); } } @@ -347,8 +367,15 @@ public class ProcedureExecutor { HashSet waitingSet = null; procIter.reset(); while (procIter.hasNext()) { - Procedure proc = procIter.next(); - if (LOG.isDebugEnabled()) { + if (procIter.isNextCompleted()) { + procIter.skipNext(); + continue; + } + + Procedure proc = procIter.nextAsProcedure(); + assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; + + if (isDebugEnabled) { LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s", proc.getState(), proc.hasException(), proc)); } @@ -360,18 +387,6 @@ public class ProcedureExecutor { continue; } - if (!proc.hasParent() && proc.isFinished()) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("The procedure is completed state=%s isFailed=%s", - proc.getState(), proc.hasException())); - } - assert !rollbackStack.containsKey(proc.getProcId()); - procedures.remove(proc.getProcId()); - completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey())); - - continue; - } - if (proc.hasParent() && !proc.isFinished()) { Procedure parent = procedures.get(proc.getParentProcId()); // corrupted procedures are handled later at step 3 @@ -850,13 +865,15 @@ public class ProcedureExecutor { break; } - if (proc.getProcId() == rootProcId && proc.isSuccess()) { - // Finalize the procedure state + if (proc.isSuccess()) { if (LOG.isDebugEnabled()) { LOG.debug("Procedure completed in " + StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc); } - procedureFinished(proc); + // Finalize the procedure state + if (proc.getProcId() == rootProcId) { + procedureFinished(proc); + } break; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 39a3472..5308c1b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.Procedure; /** @@ -67,11 +68,26 @@ public interface ProcedureStore { boolean hasNext(); /** + * @return true if the iterator next element is a completed procedure. + */ + boolean isNextCompleted(); + + /** + * Skip the next procedure + */ + void skipNext(); + + /** * Returns the next procedure in the iteration. * @throws IOException if there was an error fetching/deserializing the procedure * @return the next procedure in the iteration. */ - Procedure next() throws IOException; + Procedure nextAsProcedure() throws IOException; + + /** + * @return the next procedure in the iteration as ProcedureInfo. + */ + ProcedureInfo nextAsProcedureInfo(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 6823288..fe2904b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -414,7 +414,9 @@ public class ProcedureStoreTracker { node.updateState(procId, isDeleted); } - public void clear() { + public void reset() { + this.keepDeletes = false; + this.partial = false; this.map.clear(); resetUpdates(); } @@ -579,11 +581,11 @@ public class ProcedureStoreTracker { } public void readFrom(final InputStream stream) throws IOException { - ProcedureProtos.ProcedureStoreTracker data = + reset(); + final ProcedureProtos.ProcedureStoreTracker data = ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream); - map.clear(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) { - BitSetNode node = BitSetNode.convert(protoNode); + final BitSetNode node = BitSetNode.convert(protoNode); map.put(node.getStart(), node); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index c75c141..22eac77 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -65,7 +65,6 @@ public final class ProcedureWALFormat { } interface Loader extends ProcedureLoader { - void removeLog(ProcedureWALFile log); void markCorruptedWAL(ProcedureWALFile log, IOException e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index fa4fccf..4d268ab 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -22,9 +22,10 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; @@ -79,7 +80,7 @@ public class ProcedureWALFormatReader { // // Fast Start: INIT/INSERT record and StackIDs // --------------------------------------------- - // We have to special record, INIT and INSERT that tracks the first time + // We have two special record, INIT and INSERT that tracks the first time // the procedure was added to the WAL. We can use that information to be able // to start procedures before reaching the end of the WAL, or before reading all the WALs. // but in some cases the WAL with that record can be already gone. @@ -146,10 +147,7 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localProcedureMap.isEmpty()) { - LOG.info("No active entry found in state log " + log + ". removing it"); - loader.removeLog(log); - } else { + if (!localProcedureMap.isEmpty()) { log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); procedureMap.mergeTail(localProcedureMap); //if (hasFastStartSupport) { @@ -215,6 +213,7 @@ public class ProcedureWALFormatReader { } maxProcId = Math.max(maxProcId, entry.getProcId()); localProcedureMap.remove(entry.getProcId()); + assert !procedureMap.contains(entry.getProcId()); tracker.setDeleted(entry.getProcId(), true); } @@ -265,6 +264,20 @@ public class ProcedureWALFormatReader { public boolean hasParent() { return proto.hasParentId(); } public boolean isReady() { return ready; } + public boolean isCompleted() { + if (!hasParent()) { + switch (proto.getState()) { + case ROLLEDBACK: + return true; + case FINISHED: + return !proto.hasException(); + default: + break; + } + } + return false; + } + public Procedure convert() throws IOException { if (procedure == null) { procedure = Procedure.convert(proto); @@ -272,6 +285,10 @@ public class ProcedureWALFormatReader { return procedure; } + public ProcedureInfo convertToInfo() { + return ProcedureInfo.convert(proto); + } + @Override public String toString() { return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")"; @@ -298,13 +315,32 @@ public class ProcedureWALFormatReader { } @Override - public Procedure next() throws IOException { + public boolean isNextCompleted() { + return current != null && current.isCompleted(); + } + + @Override + public void skipNext() { + current = current.replayNext; + } + + @Override + public Procedure nextAsProcedure() throws IOException { try { return current.convert(); } finally { current = current.replayNext; } } + + @Override + public ProcedureInfo nextAsProcedureInfo() { + try { + return current.convertToInfo(); + } finally { + current = current.replayNext; + } + } } private static class WalProcedureMap { http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index d871e37..a8d2db0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -236,8 +236,13 @@ public class WALProcedureStore extends ProcedureStoreBase { return storeTracker; } - public LinkedList getActiveLogs() { - return logs; + public ArrayList getActiveLogs() { + lock.lock(); + try { + return new ArrayList(logs); + } finally { + lock.unlock(); + } } public Set getCorruptedLogs() { @@ -296,7 +301,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Load the old logs - final ArrayList toRemove = new ArrayList(); Iterator it = logs.descendingIterator(); it.next(); // Skip the current log try { @@ -317,11 +321,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void removeLog(ProcedureWALFile log) { - toRemove.add(log); - } - - @Override public void markCorruptedWAL(ProcedureWALFile log, IOException e) { if (corruptedLogs == null) { corruptedLogs = new HashSet(); @@ -331,11 +330,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } }); } finally { - if (!toRemove.isEmpty()) { - for (ProcedureWALFile log: toRemove) { - removeLogFile(log); - } - } loading.set(false); } } @@ -584,6 +578,7 @@ public class WALProcedureStore extends ProcedureStoreBase { totalSynced = syncSlots(stream, slots, 0, slotIndex); break; } catch (Throwable e) { + LOG.warn("unable to sync slots, retry=" + retry); if (++retry >= maxRetriesBeforeRoll) { if (logRolled >= maxSyncFailureRoll) { LOG.error("Sync slots after log roll failed, abort.", e); @@ -627,14 +622,15 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriterOrDie() { - for (int i = 1; i <= rollRetries; ++i) { + for (int i = 0; i < rollRetries; ++i) { + if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); + try { if (rollWriter()) { return true; } } catch (IOException e) { - LOG.warn("Unable to roll the log, attempt=" + i, e); - Threads.sleepWithoutInterrupt(waitBeforeRoll); + LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); } } LOG.fatal("Unable to roll the log"); @@ -881,7 +877,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private long getMaxLogId(final FileStatus[] logFiles) { + private static long getMaxLogId(final FileStatus[] logFiles) { long maxLogId = 0; if (logFiles != null && logFiles.length > 0) { for (int i = 0; i < logFiles.length; ++i) { @@ -924,7 +920,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (IOException e) { LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); // try the next one... - storeTracker.clear(); + storeTracker.reset(); storeTracker.setPartialFlag(true); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 26a94d4..6bc5d36 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -192,7 +192,7 @@ public class TestProcedureStoreTracker { count++; } - tracker.clear(); + tracker.reset(); } } @@ -212,7 +212,7 @@ public class TestProcedureStoreTracker { tracker.setDeleted(i, false); } - tracker.clear(); + tracker.reset(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 18ee05b..e665cce 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -160,6 +160,56 @@ public class TestWALProcedureStore { } @Test + public void testNoTrailerDoubleRestart() throws Exception { + // log-0001: proc 0, 1 and 2 are inserted + Procedure proc0 = new TestSequentialProcedure(); + procStore.insert(proc0, null); + Procedure proc1 = new TestSequentialProcedure(); + procStore.insert(proc1, null); + Procedure proc2 = new TestSequentialProcedure(); + procStore.insert(proc2, null); + procStore.rollWriterForTesting(); + + // log-0002: proc 1 deleted + procStore.delete(proc1.getProcId()); + procStore.rollWriterForTesting(); + + // log-0003: proc 2 is update + procStore.update(proc2); + procStore.rollWriterForTesting(); + + // log-0004: proc 2 deleted + procStore.delete(proc2.getProcId()); + + // stop the store and remove the trailer + procStore.stop(false); + FileStatus[] logs = fs.listStatus(logDir); + assertEquals(4, logs.length); + for (int i = 0; i < logs.length; ++i) { + corruptLog(logs[i], 4); + } + + // Test Load 1 + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(1, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + + // Test Load 2 + assertEquals(5, fs.listStatus(logDir).length); + loader = new LoadCounter(); + storeRestart(loader); + assertEquals(1, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + + // remove proc-0 + procStore.delete(proc0.getProcId()); + procStore.periodicRollForTesting(); + assertEquals(1, fs.listStatus(logDir).length); + storeRestart(loader); + } + + @Test public void testCorruptedTrailer() throws Exception { // Insert something for (int i = 0; i < 100; ++i) { @@ -290,9 +340,9 @@ public class TestWALProcedureStore { @Override public void load(ProcedureIterator procIter) throws IOException { assertTrue(procIter.hasNext()); - assertEquals(1, procIter.next().getProcId()); + assertEquals(1, procIter.nextAsProcedureInfo().getProcId()); assertTrue(procIter.hasNext()); - assertEquals(2, procIter.next().getProcId()); + assertEquals(2, procIter.nextAsProcedureInfo().getProcId()); assertFalse(procIter.hasNext()); } @@ -346,16 +396,16 @@ public class TestWALProcedureStore { @Override public void load(ProcedureIterator procIter) throws IOException { assertTrue(procIter.hasNext()); - assertEquals(4, procIter.next().getProcId()); + assertEquals(4, procIter.nextAsProcedureInfo().getProcId()); // TODO: This will be multiple call once we do fast-start //assertFalse(procIter.hasNext()); assertTrue(procIter.hasNext()); - assertEquals(1, procIter.next().getProcId()); + assertEquals(1, procIter.nextAsProcedureInfo().getProcId()); assertTrue(procIter.hasNext()); - assertEquals(2, procIter.next().getProcId()); + assertEquals(2, procIter.nextAsProcedureInfo().getProcId()); assertTrue(procIter.hasNext()); - assertEquals(3, procIter.next().getProcId()); + assertEquals(3, procIter.nextAsProcedureInfo().getProcId()); assertFalse(procIter.hasNext()); } @@ -580,7 +630,7 @@ public class TestWALProcedureStore { @Override public void load(ProcedureIterator procIter) throws IOException { while (procIter.hasNext()) { - Procedure proc = procIter.next(); + Procedure proc = procIter.nextAsProcedure(); LOG.debug("loading procId=" + proc.getProcId() + ": " + proc); if (procIds != null) { assertTrue("procId=" + proc.getProcId() + " unexpected", @@ -593,7 +643,7 @@ public class TestWALProcedureStore { @Override public void handleCorrupted(ProcedureIterator procIter) throws IOException { while (procIter.hasNext()) { - Procedure proc = procIter.next(); + Procedure proc = procIter.nextAsProcedure(); LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); corrupted.add(proc); }