accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/4] git commit: ACCUMULO-3182 Ensures that a WAL with a partial header (or completely) empty can be successfully recovered.
Date Thu, 02 Oct 2014 23:19:29 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6 65acebbf9 -> a65f1234e
  refs/heads/master 9a8554f9c -> 089408d59


ACCUMULO-3182 Ensures that a WAL with a partial header (or completely) empty can be successfully
recovered.

In the case where a WAL is observed which has a partial or missing header,
the file is treated as "empty": no log entries will be attempted to be
read and replayed from it. This should restore the normaly functionality
that was observed with 1.5 WALs.

Integration test was introduced which tests that the introduction of
either an empty WAL or a WAL with a partial header does not block
recovery of the tablet which references the "bad" WAL.

Additionally, an enum was introduced to remove the use of hard-coded
WAL recovery status marker files: "finished" and "failed".


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d65e0e32
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d65e0e32
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d65e0e32

Branch: refs/heads/1.6
Commit: d65e0e32de29206b627e578b957ce66c0d644cea
Parents: 3c90ee9
Author: Josh Elser <elserj@apache.org>
Authored: Thu Oct 2 19:04:44 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Oct 2 19:04:44 2014 -0400

----------------------------------------------------------------------
 .../accumulo/server/log/SortedLogState.java     |  66 ++++++
 .../master/recovery/RecoveryManager.java        |   3 +-
 .../apache/accumulo/tserver/TabletServer.java   |   9 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  | 157 ++++++++------
 .../apache/accumulo/tserver/log/LogSorter.java  |  36 ++-
 .../accumulo/tserver/log/MultiReader.java       |  34 +--
 .../accumulo/tserver/logger/LogReader.java      |  13 +-
 .../tserver/log/SortedLogRecoveryTest.java      |   3 +-
 .../tserver/log/TestUpgradePathForWALogs.java   |   3 +-
 .../MissingWalHeaderCompletesRecoveryIT.java    | 217 +++++++++++++++++++
 10 files changed, 443 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
new file mode 100644
index 0000000..f337cd8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A file is written in the destination directory for the sorting of write-ahead logs that
need recovering. The value of {@link #marker} is the name of the file
+ * that will exist in the sorted output directory.
+ */
+public enum SortedLogState {
+  FINISHED("finished"), FAILED("failed");
+
+  private String marker;
+
+  private SortedLogState(String marker) {
+    this.marker = marker;
+  }
+
+  public String getMarker() {
+    return marker;
+  }
+
+  public static boolean isFinished(String fileName) {
+    return FINISHED.getMarker().equals(fileName);
+  }
+
+  public static boolean isFailed(String fileName) {
+    return FAILED.getMarker().equals(fileName);
+  }
+
+  public static Path getFinishedMarkerPath(String rootPath) {
+    return new Path(rootPath, FINISHED.getMarker());
+  }
+
+  public static Path getFinishedMarkerPath(Path rootPath) {
+    return new Path(rootPath, FINISHED.getMarker());
+  }
+
+  public static Path getFailedMarkerPath(String rootPath) {
+    return new Path(rootPath, FAILED.getMarker());
+  }
+  
+  public static Path getFailedMarkerPath(Path rootPath) {
+    return new Path(rootPath, FAILED.getMarker());
+  }
+
+  @Override
+  public String toString() {
+    return marker;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index 76d3520..0c2e1f0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
 import org.apache.accumulo.server.master.recovery.LogCloser;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
@@ -154,7 +155,7 @@ public class RecoveryManager {
           }
         }
 
-        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
+        if (master.getFileSystem().exists(SortedLogState.getFinishedMarkerPath(dest))) {
           synchronized (this) {
             closeTasksQueued.remove(sortId);
             recoveryDelay.remove(sortId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 03dc86d..2551bd0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,6 +161,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -250,7 +251,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  
+
   private TabletServerLogger logger;
 
   protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
@@ -336,7 +337,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (sawChange) {
       log.debug(sb.toString());
     }
-    
+
     final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
     if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
       long diff = now - lastMemoryCheckTime;
@@ -347,7 +348,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       lastMemoryCheckTime = now;
       return;
     }
-    
+
     if (maxIncreaseInCollectionTime > keepAliveTimeout) {
       Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.",
-1);
     }
@@ -3700,7 +3701,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       Path recovery = null;
       for (String log : entry.logSet) {
         Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
-        finished = new Path(finished, "finished");
+        finished = SortedLogState.getFinishedMarkerPath(finished);
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
           recovery = finished.getParent();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index b152380..2bd0b47 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -24,6 +24,7 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
@@ -67,12 +68,11 @@ import org.apache.log4j.Logger;
 
 /**
  * Wrap a connection to a logger.
- * 
+ *
  */
 public class DfsLogger {
-  // Package private so that LogSorter can find this
-  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
-  static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
+  public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+  public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 
   private static Logger log = Logger.getLogger(DfsLogger.class);
 
@@ -84,6 +84,26 @@ public class DfsLogger {
     }
   }
 
+  /**
+   * A well-timed tabletserver failure could result in an incomplete header written to a
write-ahead log. This exception is thrown when the header cannot be
+   * read from a WAL which should only happen when the tserver dies as described.
+   */
+  public static class LogHeaderIncompleteException extends IOException {
+    private static final long serialVersionUID = 1l;
+
+    public LogHeaderIncompleteException(String msg) {
+      super(msg);
+    }
+
+    public LogHeaderIncompleteException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+
+    public LogHeaderIncompleteException(Throwable cause) {
+      super(cause);
+    }
+  }
+
   public static class DFSLoggerInputStreams {
 
     private FSDataInputStream originalInput;
@@ -229,7 +249,9 @@ public class DfsLogger {
 
   /**
    * Refernce a pre-existing log file.
-   * @param meta the cq for the "log" entry in +r/!0
+   *
+   * @param meta
+   *          the cq for the "log" entry in +r/!0
    */
   public DfsLogger(ServerResources conf, String filename, String meta) throws IOException
{
     this.conf = conf;
@@ -243,75 +265,82 @@ public class DfsLogger {
 
     byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
     byte[] magicBuffer = new byte[magic.length];
-    input.readFully(magicBuffer);
-    if (Arrays.equals(magicBuffer, magic)) {
-      // additional parameters it needs from the underlying stream.
-      String cryptoModuleClassname = input.readUTF();
-      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
+    try {
+      input.readFully(magicBuffer);
+      if (Arrays.equals(magicBuffer, magic)) {
+        // additional parameters it needs from the underlying stream.
+        String cryptoModuleClassname = input.readUTF();
+        CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 
-      // Create the parameters and set the input stream into those parameters
-      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-      params.setEncryptedInputStream(input);
+        // Create the parameters and set the input stream into those parameters
+        CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+        params.setEncryptedInputStream(input);
 
-      // Create the plaintext input stream from the encrypted one
-      params = cryptoModule.getDecryptingInputStream(params);
+        // Create the plaintext input stream from the encrypted one
+        params = cryptoModule.getDecryptingInputStream(params);
 
-      if (params.getPlaintextInputStream() instanceof DataInputStream) {
-        decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-      } else {
-        decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-      }
-    } else {
-      input.seek(0);
-      byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
-      byte[] magicBufferV2 = new byte[magicV2.length];
-      input.readFully(magicBufferV2);
-
-      if (Arrays.equals(magicBufferV2, magicV2)) {
-        // Log files from 1.5 dump their options in raw to the logger files. Since we don't
know the class
-        // that needs to read those files, we can make a couple of basic assumptions. Either
it's going to be
-        // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
-
-        // If it's null, we won't have any parameters whatsoever. First, let's attempt to
read
-        // parameters
-        Map<String,String> opts = new HashMap<String,String>();
-        int count = input.readInt();
-        for (int i = 0; i < count; i++) {
-          String key = input.readUTF();
-          String value = input.readUTF();
-          opts.put(key, value);
+        if (params.getPlaintextInputStream() instanceof DataInputStream) {
+          decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+        } else {
+          decryptingInput = new DataInputStream(params.getPlaintextInputStream());
         }
+      } else {
+        input.seek(0);
+        byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
+        byte[] magicBufferV2 = new byte[magicV2.length];
+        input.readFully(magicBufferV2);
+
+        if (Arrays.equals(magicBufferV2, magicV2)) {
+          // Log files from 1.5 dump their options in raw to the logger files. Since we don't
know the class
+          // that needs to read those files, we can make a couple of basic assumptions. Either
it's going to be
+          // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
+
+          // If it's null, we won't have any parameters whatsoever. First, let's attempt
to read
+          // parameters
+          Map<String,String> opts = new HashMap<String,String>();
+          int count = input.readInt();
+          for (int i = 0; i < count; i++) {
+            String key = input.readUTF();
+            String value = input.readUTF();
+            opts.put(key, value);
+          }
 
-        if (opts.size() == 0) {
-          // NullCryptoModule, we're done
-          decryptingInput = input;
-        } else {
+          if (opts.size() == 0) {
+            // NullCryptoModule, we're done
+            decryptingInput = input;
+          } else {
 
-          // The DefaultCryptoModule will want to read the parameters from the underlying
file, so we will put the file back to that spot.
-          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-              .getCryptoModule(DefaultCryptoModule.class.getName());
+            // The DefaultCryptoModule will want to read the parameters from the underlying
file, so we will put the file back to that spot.
+            org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+                .getCryptoModule(DefaultCryptoModule.class.getName());
 
-          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+            CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 
-          input.seek(0);
-          input.readFully(magicBufferV2);
-          params.setEncryptedInputStream(input);
+            input.seek(0);
+            input.readFully(magicBufferV2);
+            params.setEncryptedInputStream(input);
 
-          params = cryptoModule.getDecryptingInputStream(params);
-          if (params.getPlaintextInputStream() instanceof DataInputStream) {
-            decryptingInput = (DataInputStream) params.getPlaintextInputStream();
-          } else {
-            decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            params = cryptoModule.getDecryptingInputStream(params);
+            if (params.getPlaintextInputStream() instanceof DataInputStream) {
+              decryptingInput = (DataInputStream) params.getPlaintextInputStream();
+            } else {
+              decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            }
           }
-        }
 
-      } else {
+        } else {
 
-        input.seek(0);
-        decryptingInput = input;
-      }
+          input.seek(0);
+          decryptingInput = input;
+        }
 
+      }
+    } catch (EOFException e) {
+      log.warn("Got EOFException trying to read WAL header information, assuming the rest
of the file (" + path + ") has no data.");
+      // A TabletServer might have died before the (complete) header was written
+      throw new LogHeaderIncompleteException(e);
     }
+
     return new DFSLoggerInputStreams(input, decryptingInput);
   }
 
@@ -438,7 +467,7 @@ public class DfsLogger {
     }
 
     // wait for background thread to finish before closing log file
-    if(syncThread != null){
+    if (syncThread != null) {
       try {
         syncThread.join();
       } catch (InterruptedException e) {
@@ -446,12 +475,12 @@ public class DfsLogger {
       }
     }
 
-    //expect workq should be empty at this point
-    if(workQueue.size() != 0){
+    // expect workq should be empty at this point
+    if (workQueue.size() != 0) {
       log.error("WAL work queue not empty after sync thread exited");
       throw new IllegalStateException("WAL work queue not empty after sync thread exited");
     }
-    
+
     if (encryptingLogFile != null)
       try {
         logFile.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index bb8e3c7..6095f88 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -37,9 +37,11 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
 /**
- * 
+ *
  */
 public class LogSorter {
 
@@ -110,7 +112,19 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does not exist
         fs.deleteRecursively(new Path(destPath));
 
-        DFSLoggerInputStreams inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath,
conf);
+        DFSLoggerInputStreams inputStreams;
+        try {
+          inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf);
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting.");
+          // Creating a 'finished' marker will cause recovery to proceed normally and the
+          // empty file will be correctly ignored downstream.
+          fs.mkdirs(new Path(destPath));
+          writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(),
part++);
+          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+          return;
+        }
+
         this.input = inputStreams.getOriginalInput();
         this.decryptingInput = inputStreams.getDecryptingInputStream();
 
@@ -140,7 +154,7 @@ public class LogSorter {
         try {
           // parent dir may not exist
           fs.mkdirs(new Path(destPath));
-          fs.create(new Path(destPath, "failed")).close();
+          fs.create(SortedLogState.getFailedMarkerPath(destPath)).close();
         } catch (IOException e) {
           log.error("Error creating failed flag file " + name, e);
         }
@@ -158,10 +172,10 @@ public class LogSorter {
       }
     }
 
-    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>>
buffer, int part) throws IOException {
+    private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>>
buffer, int part) throws IOException {
       Path path = new Path(destPath, String.format("part-r-%05d", part++));
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      
+
       @SuppressWarnings("deprecation")
       MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class,
LogFileValue.class);
       try {
@@ -180,10 +194,14 @@ public class LogSorter {
     }
 
     synchronized void close() throws IOException {
-      bytesCopied = input.getPos();
-      input.close();
-      decryptingInput.close();
-      input = null;
+      // If we receive an empty or malformed-header WAL, we won't
+      // have input streams that need closing. Avoid the NPE.
+      if (null != input) {
+        bytesCopied = input.getPos();
+        input.close();
+        decryptingInput.close();
+        input = null;
+      }
     }
 
     public synchronized long getSortTime() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
index 541f075..c2a0683 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
@@ -20,6 +20,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.commons.collections.buffer.PriorityBuffer;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,23 +33,23 @@ import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Provide simple Map.Reader methods over multiple Maps.
- * 
+ *
  * Presently only supports next() and seek() and works on all the Map directories within
a directory. The primary purpose of this class is to merge the results
  * of multiple Reduce jobs that result in Map output files.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class MultiReader {
-  
+
   /**
    * Group together the next key/value from a Reader with the Reader
-   * 
+   *
    */
   private static class Index implements Comparable<Index> {
     Reader reader;
     WritableComparable key;
     Writable value;
     boolean cached = false;
-    
+
     private static Object create(java.lang.Class<?> klass) {
       try {
         return klass.getConstructor().newInstance();
@@ -56,19 +57,20 @@ public class MultiReader {
         throw new RuntimeException("Unable to construct objects to use for comparison");
       }
     }
-    
+
     public Index(Reader reader) {
       this.reader = reader;
       key = (WritableComparable) create(reader.getKeyClass());
       value = (Writable) create(reader.getValueClass());
     }
-    
+
     private void cache() throws IOException {
       if (!cached && reader.next(key, value)) {
         cached = true;
       }
     }
-    
+
+    @Override
     public int compareTo(Index o) {
       try {
         cache();
@@ -84,16 +86,16 @@ public class MultiReader {
       }
     }
   }
-  
+
   private PriorityBuffer heap = new PriorityBuffer();
-  
+
   @SuppressWarnings("deprecation")
   public MultiReader(VolumeManager fs, Path directory) throws IOException {
     boolean foundFinish = false;
     for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
         continue;
-      if (child.getPath().getName().equals("finished")) {
+      if (SortedLogState.isFinished(child.getPath().getName())) {
         foundFinish = true;
         continue;
       }
@@ -101,9 +103,9 @@ public class MultiReader {
       heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
     }
     if (!foundFinish)
-      throw new IOException("Sort \"finished\" flag not found in " + directory);
+      throw new IOException("Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not
found in " + directory);
   }
-  
+
   private static void copy(Writable src, Writable dest) throws IOException {
     // not exactly efficient...
     DataOutputBuffer output = new DataOutputBuffer();
@@ -112,7 +114,7 @@ public class MultiReader {
     input.reset(output.getData(), output.getLength());
     dest.readFields(input);
   }
-  
+
   public synchronized boolean next(WritableComparable key, Writable val) throws IOException
{
     Index elt = (Index) heap.remove();
     try {
@@ -129,7 +131,7 @@ public class MultiReader {
     }
     return true;
   }
-  
+
   public synchronized boolean seek(WritableComparable key) throws IOException {
     PriorityBuffer reheap = new PriorityBuffer(heap.size());
     boolean result = false;
@@ -149,7 +151,7 @@ public class MultiReader {
     heap = reheap;
     return result;
   }
-  
+
   public void close() throws IOException {
     IOException problem = null;
     for (Object obj : heap) {
@@ -164,5 +166,5 @@ public class MultiReader {
       throw problem;
     heap = null;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index a1229e7..c3f4fd0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -35,14 +35,17 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
+import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.MultiReader;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 
 public class LogReader {
+  private static final Logger log = Logger.getLogger(LogReader.class);
 
   static class Opts extends Help {
     @Parameter(names = "-r", description = "print only mutations associated with the given
row")
@@ -59,7 +62,7 @@ public class LogReader {
 
   /**
    * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
-   * 
+   *
    * @param args
    *          - first argument is the file to print
    */
@@ -96,7 +99,13 @@ public class LogReader {
 
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+        DFSLoggerInputStreams streams;
+        try {
+          streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration());
+        } catch (LogHeaderIncompleteException e) {
+          log.warn("Could not read header for " + path + ". Ignoring...");
+          continue;
+        }
         DataInputStream input = streams.getDecryptingInputStream();
 
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index fffa15e..03361d1 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -132,7 +133,7 @@ public class SortedLogRecoveryTest {
           map.append(lfe.key, lfe.value);
         }
         map.close();
-        ns.create(new Path(path, "finished")).close();
+        ns.create(SortedLogState.getFinishedMarkerPath(path)).close();
         dirs.add(new Path(path));
       }
       // Recover

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index d6c23e3..1f2ce6a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.Path;
@@ -66,7 +67,7 @@ public class TestUpgradePathForWALogs {
     fs = VolumeManagerImpl.getLocal(path);
     Path manyMapsPath = new Path("file://" + path);
     fs.mkdirs(manyMapsPath);
-    fs.create(new Path(manyMapsPath, "finished")).close();
+    fs.create(SortedLogState.getFinishedMarkerPath(manyMapsPath)).close();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d65e0e32/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
new file mode 100644
index 0000000..7f2f6f9
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(MissingWalHeaderCompletesRecoveryIT.class);
+
+  private static boolean rootHasWritePermission;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration conf) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+    // Make sure the GC doesn't delete the file before the metadata reference is added
+    cfg.setProperty(Property.GC_CYCLE_START, "999999s");
+    conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Before
+  public void setupMetadataPermission() throws Exception {
+    Connector conn = getConnector();
+    rootHasWritePermission = conn.securityOperations().hasTablePermission("root", MetadataTable.NAME,
TablePermission.WRITE);
+    if (!rootHasWritePermission) {
+      conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      // Make sure it propagates through ZK
+      Thread.sleep(5000);
+    }
+  }
+
+  @After
+  public void resetMetadataPermission() throws Exception {
+    Connector conn = getConnector();
+    // Final state doesn't match the original
+    if (rootHasWritePermission != conn.securityOperations().hasTablePermission("root", MetadataTable.NAME,
TablePermission.WRITE)) {
+      if (rootHasWritePermission) {
+        // root had write permission when starting, ensure root still does
+        conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      } else {
+        // root did not have write permission when starting, ensure that it does not
+        conn.securityOperations().revokeTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+      }
+    }
+  }
+
+  @Test
+  public void testEmptyWalRecoveryCompletes() throws Exception {
+    Connector conn = getConnector();
+    MiniAccumuloClusterImpl cluster = getCluster();
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    // Fake out something that looks like host:port, it's irrelevant
+    String fakeServer = "127.0.0.1:12345";
+
+    File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+    File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+    File emptyWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+    log.info("Created empty WAL at " + emptyWalog.toURI());
+
+    fs.create(new Path(emptyWalog.toURI())).close();
+
+    Assert.assertTrue("root user did not have write permission to metadata table",
+        conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = emptyWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+
+    log.info("Taking {} offline", tableName);
+    conn.tableOperations().offline(tableName, true);
+
+    log.info("{} is offline", tableName);
+
+    Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+    Mutation m = new Mutation(row);
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Bringing {} online", tableName);
+    conn.tableOperations().online(tableName, true);
+
+    log.info("{} is online", tableName);
+
+    // Reading the table implies that recovery completed successfully (the empty file was
ignored)
+    // otherwise the tablet will never come online and we won't be able to read it.
+    Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+  }
+
+  @Test
+  public void testPartialHeaderWalRecoveryCompletes() throws Exception {
+    Connector conn = getConnector();
+    MiniAccumuloClusterImpl cluster = getCluster();
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    // Fake out something that looks like host:port, it's irrelevant
+    String fakeServer = "127.0.0.1:12345";
+
+    File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+    File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+    File partialHeaderWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+    log.info("Created WAL with malformed header at " + partialHeaderWalog.toURI());
+
+    // Write half of the header
+    FSDataOutputStream wal = fs.create(new Path(partialHeaderWalog.toURI()));
+    wal.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(), 0, DfsLogger.LOG_FILE_HEADER_V3.length()
/ 2);
+    wal.close();
+
+    Assert.assertTrue("root user did not have write permission to metadata table",
+        conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = partialHeaderWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+
+    log.info("Taking {} offline", tableName);
+    conn.tableOperations().offline(tableName, true);
+
+    log.info("{} is offline", tableName);
+
+    Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+    Mutation m = new Mutation(row);
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Bringing {} online", tableName);
+    conn.tableOperations().online(tableName, true);
+
+    log.info("{} is online", tableName);
+
+    // Reading the table implies that recovery completed successfully (the empty file was
ignored)
+    // otherwise the tablet will never come online and we won't be able to read it.
+    Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+  }
+
+}


Mime
View raw message