hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [3/3] hbase git commit: HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed
Date Sat, 23 Jan 2016 00:33:37 GMT
HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8a34c141
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8a34c141
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8a34c141

Branch: refs/heads/branch-1.2
Commit: 8a34c14129758adcea62299dfaaeb193d0279758
Parents: cd92094
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Fri Jan 22 15:57:12 2016 -0800
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Fri Jan 22 16:23:43 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ProcedureInfo.java  | 18 +++--
 .../hadoop/hbase/procedure2/Procedure.java      | 17 +----
 .../hbase/procedure2/ProcedureExecutor.java     | 77 ++++++++++++--------
 .../hbase/procedure2/store/ProcedureStore.java  | 18 ++++-
 .../procedure2/store/ProcedureStoreTracker.java | 10 ++-
 .../store/wal/ProcedureWALFormat.java           |  1 -
 .../store/wal/ProcedureWALFormatReader.java     | 50 +++++++++++--
 .../procedure2/store/wal/WALProcedureStore.java | 32 ++++----
 .../store/TestProcedureStoreTracker.java        |  4 +-
 .../store/wal/TestWALProcedureStore.java        | 66 +++++++++++++++--
 10 files changed, 201 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
index 11264cd..4768d98 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
@@ -41,12 +41,12 @@ public class ProcedureInfo implements Cloneable {
   private final String procOwner;
   private final ProcedureState procState;
   private final long parentId;
+  private final NonceKey nonceKey;
   private final ForeignExceptionMessage exception;
   private final long lastUpdate;
   private final long startTime;
   private final byte[] result;
 
-  private NonceKey nonceKey = null;
   private long clientAckTime = -1;
 
   public ProcedureInfo(
@@ -55,6 +55,7 @@ public class ProcedureInfo implements Cloneable {
       final String procOwner,
       final ProcedureState procState,
       final long parentId,
+      final NonceKey nonceKey,
       final ForeignExceptionMessage exception,
       final long lastUpdate,
       final long startTime,
@@ -64,6 +65,7 @@ public class ProcedureInfo implements Cloneable {
     this.procOwner = procOwner;
     this.procState = procState;
     this.parentId = parentId;
+    this.nonceKey = nonceKey;
     this.lastUpdate = lastUpdate;
     this.startTime = startTime;
 
@@ -75,8 +77,8 @@ public class ProcedureInfo implements Cloneable {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
       justification="Intentional; calling super class clone doesn't make sense here.")
   public ProcedureInfo clone() {
-    return new ProcedureInfo(
-      procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime,
result);
+    return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey,
+      exception, lastUpdate, startTime, result);
   }
 
   public long getProcId() {
@@ -107,10 +109,6 @@ public class ProcedureInfo implements Cloneable {
     return nonceKey;
   }
 
-  public void setNonceKey(NonceKey nonceKey) {
-    this.nonceKey = nonceKey;
-  }
-
   public boolean isFailed() {
     return exception != null;
   }
@@ -218,12 +216,18 @@ public class ProcedureInfo implements Cloneable {
    */
   @InterfaceAudience.Private
   public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
+    NonceKey nonceKey = null;
+    if (procProto.getNonce() != HConstants.NO_NONCE) {
+      nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
+    }
+
     return new ProcedureInfo(
       procProto.getProcId(),
       procProto.getClassName(),
       procProto.getOwner(),
       procProto.getState(),
       procProto.hasParentId() ? procProto.getParentId() : -1,
+      nonceKey,
       procProto.hasException() ? procProto.getException() : null,
       procProto.getLastUpdate(),
       procProto.getStartTime(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 8b343d5..64f817a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -640,30 +640,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
    */
   @InterfaceAudience.Private
   public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey)
{
-    RemoteProcedureException exception;
-
-    if (proc.hasException()) {
-      exception = proc.getException();
-    } else {
-      exception = null;
-    }
-    ProcedureInfo procInfo = new ProcedureInfo(
+    RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
+    return new ProcedureInfo(
       proc.getProcId(),
       proc.toStringClass(),
       proc.getOwner(),
       proc.getState(),
       proc.hasParent() ? proc.getParentProcId() : -1,
+      nonceKey,
       exception != null ?
           RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) :
null,
       proc.getLastUpdate(),
       proc.getStartTime(),
       proc.getResult());
-
-    if (nonceKey != null) {
-      procInfo.setNonceKey(nonceKey);
-    }
-
-    return procInfo;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 95990e8..2d51744 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -308,7 +308,7 @@ public class ProcedureExecutor<TEnvironment> {
       public void handleCorrupted(ProcedureIterator procIter) throws IOException {
         int corruptedCount = 0;
         while (procIter.hasNext()) {
-          Procedure proc = procIter.next();
+          ProcedureInfo proc = procIter.nextAsProcedureInfo();
           LOG.error("corrupted procedure: " + proc);
           corruptedCount++;
         }
@@ -321,24 +321,44 @@ public class ProcedureExecutor<TEnvironment> {
 
   private void loadProcedures(final ProcedureIterator procIter,
       final boolean abortOnCorruption) throws IOException {
+    final boolean isDebugEnabled = LOG.isDebugEnabled();
+
     // 1. Build the rollback stack
     int runnablesCount = 0;
     while (procIter.hasNext()) {
-      Procedure proc = procIter.next();
-      if (!proc.hasParent() && !proc.isFinished()) {
-        rollbackStack.put(proc.getProcId(), new RootProcedureState());
-      }
-      // add the procedure to the map
-      proc.beforeReplay(getEnvironment());
-      procedures.put(proc.getProcId(), proc);
+      final NonceKey nonceKey;
+      final long procId;
+
+      if (procIter.isNextCompleted()) {
+        ProcedureInfo proc = procIter.nextAsProcedureInfo();
+        nonceKey = proc.getNonceKey();
+        procId = proc.getProcId();
+        completed.put(proc.getProcId(), proc);
+        if (isDebugEnabled) {
+          LOG.debug("The procedure is completed: " + proc);
+        }
+      } else {
+        Procedure proc = procIter.nextAsProcedure();
+        nonceKey = proc.getNonceKey();
+        procId = proc.getProcId();
 
-      // add the nonce to the map
-      if (proc.getNonceKey() != null) {
-        nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
+        if (!proc.hasParent()) {
+          assert !proc.isFinished() : "unexpected finished procedure";
+          rollbackStack.put(proc.getProcId(), new RootProcedureState());
+        }
+
+        // add the procedure to the map
+        proc.beforeReplay(getEnvironment());
+        procedures.put(proc.getProcId(), proc);
+
+        if (proc.getState() == ProcedureState.RUNNABLE) {
+          runnablesCount++;
+        }
       }
 
-      if (proc.getState() == ProcedureState.RUNNABLE) {
-        runnablesCount++;
+      // add the nonce to the map
+      if (nonceKey != null) {
+        nonceKeysToProcIdsMap.put(nonceKey, procId);
       }
     }
 
@@ -347,8 +367,15 @@ public class ProcedureExecutor<TEnvironment> {
     HashSet<Procedure> waitingSet = null;
     procIter.reset();
     while (procIter.hasNext()) {
-      Procedure proc = procIter.next();
-      if (LOG.isDebugEnabled()) {
+      if (procIter.isNextCompleted()) {
+        procIter.skipNext();
+        continue;
+      }
+
+      Procedure proc = procIter.nextAsProcedure();
+      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc="
+ proc;
+
+      if (isDebugEnabled) {
         LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
                     proc.getState(), proc.hasException(), proc));
       }
@@ -360,18 +387,6 @@ public class ProcedureExecutor<TEnvironment> {
         continue;
       }
 
-      if (!proc.hasParent() && proc.isFinished()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("The procedure is completed state=%s isFailed=%s",
-                    proc.getState(), proc.hasException()));
-        }
-        assert !rollbackStack.containsKey(proc.getProcId());
-        procedures.remove(proc.getProcId());
-        completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
-
-        continue;
-      }
-
       if (proc.hasParent() && !proc.isFinished()) {
         Procedure parent = procedures.get(proc.getParentProcId());
         // corrupted procedures are handled later at step 3
@@ -850,13 +865,15 @@ public class ProcedureExecutor<TEnvironment> {
         break;
       }
 
-      if (proc.getProcId() == rootProcId && proc.isSuccess()) {
-        // Finalize the procedure state
+      if (proc.isSuccess()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Procedure completed in " +
               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
         }
-        procedureFinished(proc);
+        // Finalize the procedure state
+        if (proc.getProcId() == rootProcId) {
+          procedureFinished(proc);
+        }
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 39a3472..5308c1b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ProcedureInfo;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 
 /**
@@ -67,11 +68,26 @@ public interface ProcedureStore {
     boolean hasNext();
 
     /**
+     * @return true if the iterator next element is a completed procedure.
+     */
+    boolean isNextCompleted();
+
+    /**
+     * Skip the next procedure
+     */
+    void skipNext();
+
+    /**
      * Returns the next procedure in the iteration.
      * @throws IOException if there was an error fetching/deserializing the procedure
      * @return the next procedure in the iteration.
      */
-    Procedure next() throws IOException;
+    Procedure nextAsProcedure() throws IOException;
+
+    /**
+     * @return the next procedure in the iteration as ProcedureInfo.
+     */
+    ProcedureInfo nextAsProcedureInfo();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 6823288..fe2904b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -414,7 +414,9 @@ public class ProcedureStoreTracker {
     node.updateState(procId, isDeleted);
   }
 
-  public void clear() {
+  public void reset() {
+    this.keepDeletes = false;
+    this.partial = false;
     this.map.clear();
     resetUpdates();
   }
@@ -579,11 +581,11 @@ public class ProcedureStoreTracker {
   }
 
   public void readFrom(final InputStream stream) throws IOException {
-    ProcedureProtos.ProcedureStoreTracker data =
+    reset();
+    final ProcedureProtos.ProcedureStoreTracker data =
         ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
-    map.clear();
     for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList())
{
-      BitSetNode node = BitSetNode.convert(protoNode);
+      final BitSetNode node = BitSetNode.convert(protoNode);
       map.put(node.getStart(), node);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index c75c141..22eac77 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -65,7 +65,6 @@ public final class ProcedureWALFormat {
   }
 
   interface Loader extends ProcedureLoader {
-    void removeLog(ProcedureWALFile log);
     void markCorruptedWAL(ProcedureWALFile log, IOException e);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index fa4fccf..4d268ab 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -22,9 +22,10 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.ProcedureInfo;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -79,7 +80,7 @@ public class ProcedureWALFormatReader {
   //
   //  Fast Start: INIT/INSERT record and StackIDs
   // ---------------------------------------------
-  // We have to special record, INIT and INSERT that tracks the first time
+  // We have two special record, INIT and INSERT that tracks the first time
   // the procedure was added to the WAL. We can use that information to be able
   // to start procedures before reaching the end of the WAL, or before reading all the WALs.
   // but in some cases the WAL with that record can be already gone.
@@ -146,10 +147,7 @@ public class ProcedureWALFormatReader {
       loader.markCorruptedWAL(log, e);
     }
 
-    if (localProcedureMap.isEmpty()) {
-      LOG.info("No active entry found in state log " + log + ". removing it");
-      loader.removeLog(log);
-    } else {
+    if (!localProcedureMap.isEmpty()) {
       log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
       procedureMap.mergeTail(localProcedureMap);
       //if (hasFastStartSupport) {
@@ -215,6 +213,7 @@ public class ProcedureWALFormatReader {
     }
     maxProcId = Math.max(maxProcId, entry.getProcId());
     localProcedureMap.remove(entry.getProcId());
+    assert !procedureMap.contains(entry.getProcId());
     tracker.setDeleted(entry.getProcId(), true);
   }
 
@@ -265,6 +264,20 @@ public class ProcedureWALFormatReader {
     public boolean hasParent() { return proto.hasParentId(); }
     public boolean isReady() { return ready; }
 
+    public boolean isCompleted() {
+      if (!hasParent()) {
+        switch (proto.getState()) {
+          case ROLLEDBACK:
+            return true;
+          case FINISHED:
+            return !proto.hasException();
+          default:
+            break;
+        }
+      }
+      return false;
+    }
+
     public Procedure convert() throws IOException {
       if (procedure == null) {
         procedure = Procedure.convert(proto);
@@ -272,6 +285,10 @@ public class ProcedureWALFormatReader {
       return procedure;
     }
 
+    public ProcedureInfo convertToInfo() {
+      return ProcedureInfo.convert(proto);
+    }
+
     @Override
     public String toString() {
       return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
@@ -298,13 +315,32 @@ public class ProcedureWALFormatReader {
     }
 
     @Override
-    public Procedure next() throws IOException {
+    public boolean isNextCompleted() {
+      return current != null && current.isCompleted();
+    }
+
+    @Override
+    public void skipNext() {
+      current = current.replayNext;
+    }
+
+    @Override
+    public Procedure nextAsProcedure() throws IOException {
       try {
         return current.convert();
       } finally {
         current = current.replayNext;
       }
     }
+
+    @Override
+    public ProcedureInfo nextAsProcedureInfo() {
+      try {
+        return current.convertToInfo();
+      } finally {
+        current = current.replayNext;
+      }
+    }
   }
 
   private static class WalProcedureMap {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index d871e37..a8d2db0 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -236,8 +236,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return storeTracker;
   }
 
-  public LinkedList<ProcedureWALFile> getActiveLogs() {
-    return logs;
+  public ArrayList<ProcedureWALFile> getActiveLogs() {
+    lock.lock();
+    try {
+      return new ArrayList<ProcedureWALFile>(logs);
+    } finally {
+      lock.unlock();
+    }
   }
 
   public Set<ProcedureWALFile> getCorruptedLogs() {
@@ -296,7 +301,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     // Load the old logs
-    final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
     Iterator<ProcedureWALFile> it = logs.descendingIterator();
     it.next(); // Skip the current log
     try {
@@ -317,11 +321,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
 
         @Override
-        public void removeLog(ProcedureWALFile log) {
-          toRemove.add(log);
-        }
-
-        @Override
         public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
           if (corruptedLogs == null) {
             corruptedLogs = new HashSet<ProcedureWALFile>();
@@ -331,11 +330,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
       });
     } finally {
-      if (!toRemove.isEmpty()) {
-        for (ProcedureWALFile log: toRemove) {
-          removeLogFile(log);
-        }
-      }
       loading.set(false);
     }
   }
@@ -584,6 +578,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         totalSynced = syncSlots(stream, slots, 0, slotIndex);
         break;
       } catch (Throwable e) {
+        LOG.warn("unable to sync slots, retry=" + retry);
         if (++retry >= maxRetriesBeforeRoll) {
           if (logRolled >= maxSyncFailureRoll) {
             LOG.error("Sync slots after log roll failed, abort.", e);
@@ -627,14 +622,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   private boolean rollWriterOrDie() {
-    for (int i = 1; i <= rollRetries; ++i) {
+    for (int i = 0; i < rollRetries; ++i) {
+      if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
+
       try {
         if (rollWriter()) {
           return true;
         }
       } catch (IOException e) {
-        LOG.warn("Unable to roll the log, attempt=" + i, e);
-        Threads.sleepWithoutInterrupt(waitBeforeRoll);
+        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
       }
     }
     LOG.fatal("Unable to roll the log");
@@ -881,7 +877,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  private long getMaxLogId(final FileStatus[] logFiles) {
+  private static long getMaxLogId(final FileStatus[] logFiles) {
     long maxLogId = 0;
     if (logFiles != null && logFiles.length > 0) {
       for (int i = 0; i < logFiles.length; ++i) {
@@ -924,7 +920,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       } catch (IOException e) {
         LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
         // try the next one...
-        storeTracker.clear();
+        storeTracker.reset();
         storeTracker.setPartialFlag(true);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 26a94d4..6bc5d36 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -192,7 +192,7 @@ public class TestProcedureStoreTracker {
         count++;
       }
 
-      tracker.clear();
+      tracker.reset();
     }
   }
 
@@ -212,7 +212,7 @@ public class TestProcedureStoreTracker {
           tracker.setDeleted(i, false);
         }
 
-        tracker.clear();
+        tracker.reset();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8a34c141/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 18ee05b..e665cce 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -160,6 +160,56 @@ public class TestWALProcedureStore {
   }
 
   @Test
+  public void testNoTrailerDoubleRestart() throws Exception {
+    // log-0001: proc 0, 1 and 2 are inserted
+    Procedure proc0 = new TestSequentialProcedure();
+    procStore.insert(proc0, null);
+    Procedure proc1 = new TestSequentialProcedure();
+    procStore.insert(proc1, null);
+    Procedure proc2 = new TestSequentialProcedure();
+    procStore.insert(proc2, null);
+    procStore.rollWriterForTesting();
+
+    // log-0002: proc 1 deleted
+    procStore.delete(proc1.getProcId());
+    procStore.rollWriterForTesting();
+
+    // log-0003: proc 2 is update
+    procStore.update(proc2);
+    procStore.rollWriterForTesting();
+
+    // log-0004: proc 2 deleted
+    procStore.delete(proc2.getProcId());
+
+    // stop the store and remove the trailer
+    procStore.stop(false);
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(4, logs.length);
+    for (int i = 0; i < logs.length; ++i) {
+      corruptLog(logs[i], 4);
+    }
+
+    // Test Load 1
+    LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(1, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
+
+    // Test Load 2
+    assertEquals(5, fs.listStatus(logDir).length);
+    loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(1, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
+
+    // remove proc-0
+    procStore.delete(proc0.getProcId());
+    procStore.periodicRollForTesting();
+    assertEquals(1, fs.listStatus(logDir).length);
+    storeRestart(loader);
+  }
+
+  @Test
   public void testCorruptedTrailer() throws Exception {
     // Insert something
     for (int i = 0; i < 100; ++i) {
@@ -290,9 +340,9 @@ public class TestWALProcedureStore {
       @Override
       public void load(ProcedureIterator procIter) throws IOException {
         assertTrue(procIter.hasNext());
-        assertEquals(1, procIter.next().getProcId());
+        assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
         assertTrue(procIter.hasNext());
-        assertEquals(2, procIter.next().getProcId());
+        assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
         assertFalse(procIter.hasNext());
       }
 
@@ -346,16 +396,16 @@ public class TestWALProcedureStore {
       @Override
       public void load(ProcedureIterator procIter) throws IOException {
         assertTrue(procIter.hasNext());
-        assertEquals(4, procIter.next().getProcId());
+        assertEquals(4, procIter.nextAsProcedureInfo().getProcId());
         // TODO: This will be multiple call once we do fast-start
         //assertFalse(procIter.hasNext());
 
         assertTrue(procIter.hasNext());
-        assertEquals(1, procIter.next().getProcId());
+        assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
         assertTrue(procIter.hasNext());
-        assertEquals(2, procIter.next().getProcId());
+        assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
         assertTrue(procIter.hasNext());
-        assertEquals(3, procIter.next().getProcId());
+        assertEquals(3, procIter.nextAsProcedureInfo().getProcId());
         assertFalse(procIter.hasNext());
       }
 
@@ -580,7 +630,7 @@ public class TestWALProcedureStore {
     @Override
     public void load(ProcedureIterator procIter) throws IOException {
       while (procIter.hasNext()) {
-        Procedure proc = procIter.next();
+        Procedure proc = procIter.nextAsProcedure();
         LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
         if (procIds != null) {
           assertTrue("procId=" + proc.getProcId() + " unexpected",
@@ -593,7 +643,7 @@ public class TestWALProcedureStore {
     @Override
     public void handleCorrupted(ProcedureIterator procIter) throws IOException {
       while (procIter.hasNext()) {
-        Procedure proc = procIter.next();
+        Procedure proc = procIter.nextAsProcedure();
         LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
         corrupted.add(proc);
       }


Mime
View raw message