hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r950174 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/main/java/org/apache/hadoop/hbase/util/ src/test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Tue, 01 Jun 2010 18:01:54 GMT
Author: stack
Date: Tue Jun  1 18:01:53 2010
New Revision: 950174

URL: http://svn.apache.org/viewvc?rev=950174&view=rev
Log:
HBASE-2437 Refactor HLog splitLog

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/pom.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=950174&r1=950173&r2=950174&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jun  1 18:01:53 2010
@@ -654,6 +654,7 @@ Release 0.21.0 - Unreleased
    HBASE-2630  HFile should use toStringBinary in various places
    HBASE-2632  Shell should autodetect terminal width
    HBASE-2636  Upgrade Jetty to 6.1.24
+   HBASE-2437  Refactor HLog splitLog (Cosmin Lehene via Stack)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=950174&r1=950173&r2=950174&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Tue Jun  1 18:01:53 2010
@@ -448,6 +448,7 @@
     <slf4j.version>1.5.8</slf4j.version>
     <stax-api>1.0.1</stax-api>
     <thrift.version>0.2.0</thrift.version>
+    <guava.version>r03</guava.version>
   </properties>
 
   <dependencyManagement>
@@ -697,10 +698,15 @@
       <version>${commons-math.version}</version>
       <scope>test</scope>
     </dependency>
-        <dependency>
+     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-test</artifactId>
     </dependency>
+     <dependency>
+       <groupId>com.google.guava</groupId>
+       <artifactId>guava</artifactId>
+       <version>${guava.version}</version>
+    </dependency>
   </dependencies>
 
   <!--

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=950174&r1=950173&r2=950174&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Jun  1 18:01:53 2010
@@ -30,6 +30,8 @@ import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
@@ -37,9 +39,12 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -73,6 +78,9 @@ import org.apache.hadoop.hdfs.protocol.A
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
+import com.google.common.util.concurrent.NamingThreadFactory;
+
+import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
 
 /**
  * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
@@ -131,8 +139,9 @@ public class HLog implements HConstants,
   private final List<LogActionsListener> actionListeners =
       Collections.synchronizedList(new ArrayList<LogActionsListener>());
 
-  private static Class logWriterClass;
-  private static Class logReaderClass;
+
+  private static Class<? extends Writer> logWriterClass;
+  private static Class<? extends Reader> logReaderClass;
 
   private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
   private int initialReplication;    // initial replication factor of SequenceFile.writer
@@ -484,23 +493,23 @@ public class HLog implements HConstants,
    * @return A WAL reader.  Close when done with it.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public static Reader getReader(final FileSystem fs,
     final Path path, Configuration conf)
   throws IOException {
     try {
       if (logReaderClass == null) {
-        logReaderClass = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl",
-        SequenceFileLogReader.class.getCanonicalName()));
+        logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
+                SequenceFileLogReader.class, Reader.class);
       }
 
-      HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance();
+      HLog.Reader reader = logReaderClass.newInstance();
       reader.init(fs, path, conf);
       return reader;
-    } catch (Exception e) {
-      IOException ie = new IOException("cannot get log reader");
-      ie.initCause(e);
-      throw ie;
+    } catch (IOException e) {
+      throw e;
+    }
+    catch (Exception e) {
+      throw new IOException("Cannot get log reader", e);
     }
   }
 
@@ -511,14 +520,13 @@ public class HLog implements HConstants,
    * @return A WAL writer.  Close when done with it.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   public static Writer createWriter(final FileSystem fs,
       final Path path, Configuration conf)
   throws IOException {
     try {
       if (logWriterClass == null) {
-        logWriterClass = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl",
-        SequenceFileLogWriter.class.getCanonicalName()));
+        logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
+                SequenceFileLogWriter.class, Writer.class);
       }
       HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
       writer.init(fs, path, conf);
@@ -1145,10 +1153,11 @@ public class HLog implements HConstants,
    * @param rootDir qualified root directory of the HBase instance
    * @param srcDir Directory of log files to split: e.g.
    *                <code>${ROOTDIR}/log_HOST_PORT</code>
-   * @param oldLogDir
+   * @param oldLogDir directory where processed (split) logs will be archived to
    * @param fs FileSystem
    * @param conf Configuration
-   * @throws IOException
+   * @throws IOException will throw if corrupted hlogs aren't tolerated
+   * @return the list of splits
    */
   public static List<Path> splitLog(final Path rootDir, final Path srcDir,
     Path oldLogDir, final FileSystem fs, final Configuration conf)
@@ -1167,18 +1176,10 @@ public class HLog implements HConstants,
     }
     LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
       srcDir.toString());
-    splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf);
+    splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
     try {
-      FileStatus[] files = fs.listStatus(srcDir);
-      for(FileStatus file : files) {
-        Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
-        LOG.debug("Moving " +  FSUtils.getPath(file.getPath()) + " to " +
-                   FSUtils.getPath(newPath));
-        fs.rename(file.getPath(), newPath);
-      }
-      LOG.debug("Moved " + files.length + " log files to " +
-        FSUtils.getPath(oldLogDir));
-      fs.delete(srcDir, true);
+      LOG.info("Spliting is done. Removing old log dir "+srcDir);
+      fs.delete(srcDir, false);
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
       IOException io = new IOException("Cannot delete: " + srcDir);
@@ -1218,313 +1219,99 @@ public class HLog implements HConstants,
     }
   }
 
-  /*
-   * @param rootDir
-   * @param logfiles
+  /**
+   * Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions)
+   * by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size)
+   *
+   * A batch consists of a set of log files that will be sorted in a single map of edits indexed by region
+   * the resulting map will be concurrently written by multiple threads to their corresponding regions
+   *
+   * Each batch consists of more more log files that are
+   *  - recovered (files is opened for append then closed to ensure no process is writing into it)
+   *  - parsed (each edit in the log is appended to a list of edits indexed by region
+   *    see {@link #parseHLog} for more details)
+   *  - marked as either processed or corrupt depending on parsing outcome
+   *  - the resulting edits indexed by region are concurrently written to their corresponding region
+   *    region directories
+   *  - original files are then archived to a different directory
+   *
+   *
+   *
+   * @param rootDir  hbase directory
+   * @param srcDir   logs directory
+   * @param oldLogDir directory where processed logs are archived to
+   * @param logfiles the list of log files to split
    * @param fs
    * @param conf
+   * @return
    * @throws IOException
-   * @return List of splits made.
    */
-  private static List<Path> splitLog(final Path rootDir,
+  private static List<Path> splitLog(final Path rootDir, final Path srcDir,
     Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
     final Configuration conf)
   throws IOException {
+    List<Path> processedLogs = new ArrayList<Path>();
+    List<Path> corruptedLogs = new ArrayList<Path>();
     final Map<byte [], WriterAndPath> logWriters =
       Collections.synchronizedMap(
         new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
     List<Path> splits = null;
 
-    // Number of threads to use when log splitting to rewrite the logs.
-    // More means faster but bigger mem consumption.
-    int logWriterThreads =
-      conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-
-    // Number of logs to read into memory before writing to their appropriate
-    // regions when log splitting.  More means faster but bigger mem consumption
+    // Number of logs in a read batch
+    // More means faster but bigger mem consumption
+    //TODO make a note on the conf rename and update hbase-site.xml if needed
     int logFilesPerStep =
-      conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
+      conf.getInt("hbase.hlog.split.batch.size", 3);
+     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
 
-    // append support = we can avoid data loss (yay)
-    // we open for append, then close to recover the correct file length
-    final boolean appendSupport = isAppend(conf);
-
-    // store corrupt logs for post-mortem analysis (empty string = discard)
-    final String corruptDir =
-      conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt");
-
-    List<Path> finishedFiles = new LinkedList<Path>();
-    List<Path> corruptFiles = new LinkedList<Path>();
 
     try {
-      int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
-          logFilesPerStep)).intValue();
-      for (int step = 0; step < maxSteps; step++) {
-
-        // Step 1: read N log files into memory
-        final Map<byte[], LinkedList<HLog.Entry>> logEntries =
-          new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
-        int endIndex = step == maxSteps - 1? logfiles.length:
-          step * logFilesPerStep + logFilesPerStep;
-        for (int i = (step * logFilesPerStep); i < endIndex; i++) {
-          Path curLogFile = logfiles[i].getPath();
-
-          // make sure we get the right file length before opening for read
-          recoverLog(fs, curLogFile, appendSupport);
-
-          long length = fs.getFileStatus(curLogFile).getLen();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
-              ": " + curLogFile + ", length=" + length);
+      int i = -1;
+      while (i < logfiles.length) {
+        final Map<byte[], LinkedList<Entry>> editsByRegion =
+          new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
+        for (int j = 0; j < logFilesPerStep; j++) {
+          i++;
+          if (i == logfiles.length) {
+            break;
           }
-
-          Reader in = null;
-          boolean cleanRead = false;
-          int count = 0;
+          FileStatus log = logfiles[i];
+          Path logPath = log.getPath();
+          long logLength = log.getLen();
+          LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+            ": " + logPath + ", length=" + logLength );
           try {
-            in = HLog.getReader(fs, curLogFile, conf);
-            try {
-              Entry entry;
-              while ((entry = in.next()) != null) {
-                byte [] regionName = entry.getKey().getRegionName();
-                LinkedList<HLog.Entry> queue = logEntries.get(regionName);
-                if (queue == null) {
-                  queue = new LinkedList<HLog.Entry>();
-                  LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName));
-                  logEntries.put(regionName, queue);
-                }
-                queue.push(entry);
-                count++;
-              }
-              LOG.debug("Pushed=" + count + " entries from " + curLogFile);
-              cleanRead = true;
-            } catch (IOException e) {
-              LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile);
-              e = RemoteExceptionHandler.checkIOException(e);
-              if (!(e instanceof EOFException)) {
-                String msg = "Exception processing " + curLogFile +
-                             " -- continuing. Possible DATA LOSS!";
-                if (corruptDir.length() > 0) {
-                  msg += "  Storing in hlog corruption directory.";
-                }
-                LOG.warn(msg, e);
-              }
-            }
-          } catch (IOException e) {
-            if (length <= 0) {
-              LOG.warn("Empty hlog, continuing: " + logfiles[i]);
-              cleanRead = true;
-              continue;
-            }
-            throw e;
-          } finally {
-            try {
-              if (in != null) {
-                in.close();
-              }
-            } catch (IOException e) {
-              LOG.warn("File.close() threw exception -- continuing, "
-                     + "but marking file as corrupt.", e);
-              cleanRead = false;
-            }
-            if (cleanRead) {
-              finishedFiles.add(curLogFile);
-            } else {
-              corruptFiles.add(curLogFile);
-            }
-            /* TODO FOR J-D REVIEW
-            // Archive the input file now so we do not replay edits. We could
-            // have gotten here because of an exception. If so, probably
-            // nothing we can do about it. Replaying it, it could work but we
-            // could be stuck replaying for ever. Just continue though we
-            // could have lost some edits.
-            fs.rename(logfiles[i].getPath(),
-                getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
-            */
+            recoverFileLease(fs, logPath, conf);
+            parseHLog(log, editsByRegion, fs, conf);
+            processedLogs.add(logPath);
+           } catch (IOException e) {
+             if (skipErrors) {
+               LOG.warn("Got while parsing hlog " + logPath +
+                 ". Marking as corrupted", e);
+               corruptedLogs.add(logPath);
+             } else {
+               throw e;
+             }
           }
         }
-
-        // Step 2: Some regionserver log files have been read into memory.
-        //         Assign them to the appropriate region directory.
-        class ThreadWithException extends Thread {
-          ThreadWithException(String name) { super(name); }
-          public IOException exception = null;
-        }
-        List<ThreadWithException> threadList =
-          new ArrayList<ThreadWithException>(logEntries.size());
-        ExecutorService threadPool =
-          Executors.newFixedThreadPool(logWriterThreads);
-        for (final byte [] region: logEntries.keySet()) {
-          ThreadWithException thread =
-              new ThreadWithException(Bytes.toStringBinary(region)) {
-            @Override
-            public void run() {
-              LinkedList<HLog.Entry> entries = logEntries.get(region);
-              LOG.debug("Thread got " + entries.size() + " to process");
-              if(entries.size() <= 0) {
-                LOG.warn("Got a region with no entries to process.");
-                return;
-              }
-              long threadTime = System.currentTimeMillis();
-              try {
-                int count = 0;
-                // get the logfile associated with this region.  2 logs often
-                // write to the same region, so persist this info across logs
-                WriterAndPath wap = logWriters.get(region);
-                if (wap == null) {
-                  // first write to this region, make new logfile
-                  assert entries.size() > 0;
-                  Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
-                      .getTableDir(rootDir,
-                                   entries.getFirst().getKey().getTablename()),
-                      HRegionInfo.encodeRegionName(region)),
-                      HREGION_OLDLOGFILE_NAME);
-
-                  // If splitLog() was running when the user restarted his
-                  // cluster, then we could already have a 'logfile'.
-                  // Since we don't delete logs until everything is written to
-                  // their respective regions, we can safely remove this tmp.
-                  if (fs.exists(logfile)) {
-                    LOG.warn("Deleting old hlog file: " + logfile);
-                    // TODO: Archive? 
-                    fs.delete(logfile, true);
-                  }
-
-                  // associate an OutputStream with this logfile
-                  Writer w = createWriter(fs, logfile, conf);
-                  wap = new WriterAndPath(logfile, w);
-                  logWriters.put(region, wap);
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("Creating new hlog file writer for path "
-                        + logfile + " and region " + Bytes.toStringBinary(region));
-                  }
-                }
-
-                // Items were added to the linkedlist oldest first. Pull them
-                // out in that order.
-                for (ListIterator<HLog.Entry> i = entries.listIterator(entries.size());
-                    i.hasPrevious();) {
-                  wap.w.append(i.previous());
-                  count++;
-                }
-
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Applied " + count + " total edits to "
-                      + Bytes.toStringBinary(region) + " in "
-                      + (System.currentTimeMillis() - threadTime) + "ms");
-                }
-              } catch (IOException e) {
-                e = RemoteExceptionHandler.checkIOException(e);
-                LOG.warn("Got while writing region "
-                    + Bytes.toStringBinary(region) + " log " + e);
-                e.printStackTrace();
-                exception = e;
-              }
-            }
-          };
-          threadList.add(thread);
-          threadPool.execute(thread);
-        }
-        threadPool.shutdown();
-        // Wait for all threads to terminate
-        try {
-          for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
-            LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
-          }
-        } catch (InterruptedException ex) {
-          LOG.warn("Hlog writers were interrupted during splitLog().  "
-              +"Retaining log files to avoid data loss.");
-          throw new IOException(ex.getMessage(), ex.getCause());
-        }
-        // throw an exception if one of the threads reported one
-        for (ThreadWithException t : threadList) {
-          if (t.exception != null) {
-            throw t.exception;
-          }
-        }
-
-        // End of for loop. Rinse and repeat
+        writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
       }
+      if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
+        throw new IOException("Discovered orphan hlog after split. Maybe " +
+          "HRegionServer was not dead when we started");
+      }
+      archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
     } finally {
       splits = new ArrayList<Path>(logWriters.size());
       for (WriterAndPath wap : logWriters.values()) {
         wap.w.close();
-        LOG.debug("Closed " + wap.p);
         splits.add(wap.p);
+        LOG.debug("Closed " + wap.p);
       }
     }
-
-    // Step 3: All writes succeeded!  Get rid of the now-unnecessary logs
-    for(Path p : finishedFiles) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Successfully split Hlog file.  Deleting " + p);
-      }
-      try {
-        if (!fs.delete(p, true) && LOG.isDebugEnabled()) {
-          LOG.debug("Delete of split Hlog (" + p + ") failed.");
-        }
-      } catch (IOException e) {
-        // don't throw error here. worst case = double-read
-        LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e);
-      }
-    }
-    for (Path p : corruptFiles) {
-      if (corruptDir.length() > 0) {
-        // store any corrupt logs for later analysis
-        Path cp = new Path(conf.get(HBASE_DIR), corruptDir);
-        if(!fs.exists(cp)) {
-          fs.mkdirs(cp);
-        }
-        Path newp = new Path(cp, p.getName());
-        if (!fs.exists(newp)) {
-          if (!fs.rename(p, newp)) {
-            LOG.warn("Rename of " + p + " to " + newp + " failed.");
-          } else {
-            LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp);
-          }
-        } else {
-          LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp +
-                   ".  Ignoring");
-        }
-      } else {
-        // data loss is less important than disk space, delete
-        try {
-          if (!fs.delete(p, true) ) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Delete of split Hlog " + p + " failed.");
-            }
-          } else {
-            LOG.warn("Corrupt Hlog (" + p + ") deleted!");
-          }
-        } catch (IOException e) {
-          LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e);
-        }
-      }
-    }
-
     return splits;
   }
 
-  /*
-   * @param conf
-   * @return True if append enabled and we have the syncFs in our path.
-   */
-  static boolean isAppend(final Configuration conf) {
-    boolean append = conf.getBoolean("dfs.support.append", false);
-    if (append) {
-      try {
-        // TODO: The implementation that comes back when we do a createWriter
-        // may not be using SequenceFile so the below is not a definitive test.
-        // Will do for now (hdfs-200).
-        SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
-        append = true;
-      } catch (SecurityException e) {
-      } catch (NoSuchMethodException e) {
-        append = false;
-      }
-    }
-    return append;
-  }
 
   /**
    * Utility class that lets us keep track of the edit with it's key
@@ -1582,64 +1369,6 @@ public class HLog implements HConstants,
     }
   }
 
-  /*
-   * Recover log.
-   * Try and open log in append mode.
-   * Doing this, we get a hold of the file that crashed writer
-   * was writing to.  Once we have it, close it.  This will
-   * allow subsequent reader to see up to last sync.
-   * @param fs
-   * @param p
-   * @param append
-   */
-  public static void recoverLog(final FileSystem fs, final Path p,
-      final boolean append) throws IOException {
-    if (!append) {
-      return;
-    }
-
-    // lease recovery not needed for local file system case.
-    // currently, local file system doesn't implement append either.
-    if (!(fs instanceof DistributedFileSystem)) {
-      return;
-    }
-
-    LOG.debug("Recovering DFS lease for path " + p);
-    long startWaiting = System.currentTimeMillis();
-
-    // Trying recovery
-    boolean recovered = false;
-    while (!recovered) {
-      try {
-        FSDataOutputStream out = fs.append(p);
-        out.close();
-        recovered = true;
-      } catch (IOException e) {
-        e = RemoteExceptionHandler.checkIOException(e);
-        if (e instanceof AlreadyBeingCreatedException) {
-          // We expect that we'll get this message while the lease is still
-          // within its soft limit, but if we get it past that, it means
-          // that the RS is holding onto the file even though it lost its
-          // znode. We could potentially abort after some time here.
-          long waitedFor = System.currentTimeMillis() - startWaiting;
-
-          if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
-            LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p
-            + ":" + e.getMessage());
-          }
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException ex) {
-            // ignore it and try again
-          }
-        } else {
-          throw new IOException("Failed to open " + p + " for append", e);
-        }
-      }
-    }
-    LOG.info("Past out lease recovery");
-  }
-
   /**
    * Construct the HLog directory name
    *
@@ -1687,28 +1416,247 @@ public class HLog implements HConstants,
     return new Path(oldLogDir, p.getName());
   }
 
-  private static void usage() {
-    System.err.println("Usage: java org.apache.hbase.HLog" +
-        " {--dump <logfile>... | --split <logdir>...}");
+  /**
+   * Takes splitLogsMap and concurrently writes them to region directories using a thread pool
+   *
+   * @param splitLogsMap map that contains the log splitting result indexed by region
+   * @param logWriters map that contains a writer per region
+   * @param rootDir hbase root dir
+   * @param fs
+   * @param conf
+   * @throws IOException
+   */
+  private static void writeEditsBatchToRegions(
+    final Map<byte[], LinkedList<Entry>> splitLogsMap,
+    final Map<byte[], WriterAndPath> logWriters,
+    final Path rootDir, final FileSystem fs, final Configuration conf)
+  throws IOException {
+    // Number of threads to use when log splitting to rewrite the logs.
+    // More means faster but bigger mem consumption.
+    int logWriterThreads =
+      conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+    boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+    HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
+    NamingThreadFactory f  = new NamingThreadFactory(
+            "SplitWriter-%1$d", Executors.defaultThreadFactory());
+    ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
+    for (final byte[] region : splitLogsMap.keySet()) {
+      Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
+      writeFutureResult.put(region, threadPool.submit(splitter));
+    }
+
+    threadPool.shutdown();
+    // Wait for all threads to terminate
+    try {
+      for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
+        String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds";
+        if (j < 30) {
+          LOG.debug(message);
+        } else {
+          LOG.info(message);
+        }
+
+      }
+    } catch(InterruptedException ex) {
+      LOG.warn("Hlog writers were interrupted, possible data loss!");
+      if (!skipErrors) {
+        throw new IOException("Could not finish writing log entries",  ex);
+        //TODO  maybe we should fail here regardless if skipErrors is active or not
+      }
+    }
+
+    for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
+      try {
+        entry.getValue().get();
+      } catch (ExecutionException e) {
+        throw (new IOException(e.getCause()));
+      } catch (InterruptedException e1) {
+        LOG.warn("Writer for region " +  Bytes.toString(entry.getKey()) +
+                " was interrupted, however the write process should have " +
+                "finished. Throwing up ", e1);
+        throw (new IOException(e1.getCause()));
+      }
+    }
+  }
+
+  /*
+   * Parse a single hlog and put the edits in @splitLogsMap
+   *
+   * @param logfile to split
+   * @param splitLogsMap output parameter: a map with region names as keys and a
+   * list of edits as values
+   * @param fs the filesystem
+   * @param conf the configuration
+   * @throws IOException if hlog is corrupted, or can't be open
+   */
+  private static void parseHLog(final FileStatus logfile,
+    final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
+    final Configuration conf)
+  throws IOException {
+    // Check for possibly empty file. With appends, currently Hadoop reports a
+    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+    // HDFS-878 is committed.
+    long length = logfile.getLen();
+    if (length <= 0) {
+      LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
+    }
+    Path path = logfile.getPath();
+    Reader in;
+    int editsCount = 0;
+    try {
+      in = HLog.getReader(fs, path, conf);
+    } catch (EOFException e) {
+      if (length <= 0) {
+        //TODO should we ignore an empty, not-last log file if skip.errors is false?
+        //Either way, the caller should decide what to do. E.g. ignore if this is the last
+        //log in sequence.
+        //TODO is this scenario still possible if the log has been recovered (i.e. closed)
+        LOG.warn("Could not open " + path + " for reading. File is empty" + e);
+        return;
+      } else {
+        throw e;
+      }
+    }
+    try {
+      Entry entry;
+      while ((entry = in.next()) != null) {
+        byte[] region = entry.getKey().getRegionName();
+        LinkedList<Entry> queue = splitLogsMap.get(region);
+        if (queue == null) {
+          queue = new LinkedList<Entry>();
+          splitLogsMap.put(region, queue);
+        }
+        queue.addFirst(entry);
+        editsCount++;
+      }
+      LOG.debug("Pushed=" + editsCount + " entries from " + path);
+    } finally {
+      try {
+        if (in != null) {
+          in.close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Close log reader in finally threw exception -- continuing", e);
+      }
+    }
+  }
+
+  private static Callable<Void> createNewSplitter(final Path rootDir,
+    final Map<byte[], WriterAndPath> logWriters,
+    final Map<byte[], LinkedList<Entry>> logEntries,
+    final byte[] region, final FileSystem fs, final Configuration conf) {
+    return new Callable<Void>() {
+      public String getName() {
+        return "Split writer thread for region " + Bytes.toStringBinary(region);
+      }
+
+      @Override
+      public Void call() throws IOException {
+        LinkedList<Entry> entries = logEntries.get(region);
+        LOG.debug(this.getName()+" got " + entries.size() + " to process");
+        long threadTime = System.currentTimeMillis();
+        try {
+          int editsCount = 0;
+          WriterAndPath wap = logWriters.get(region);
+          for (ListIterator<Entry> iterator = entries.listIterator();
+               iterator.hasNext();) {
+            Entry logEntry =  iterator.next();
+
+            if (wap == null) {
+              Path logFile = getRegionLogPath(logEntry, rootDir);
+              if (fs.exists(logFile)) {
+                LOG.warn("Found existing old hlog file. It could be the result of a previous" +
+                        "failed split attempt. Deleting " + logFile +
+                        ", length=" + fs.getFileStatus(logFile).getLen());
+                fs.delete(logFile, false);
+              }
+              Writer w = createWriter(fs, logFile, conf);
+              wap = new WriterAndPath(logFile, w);
+              logWriters.put(region, wap);
+              LOG.debug("Creating writer path=" + logFile +
+                " region=" + Bytes.toStringBinary(region));
+            }
+
+            wap.w.append(logEntry);
+            editsCount++;
+          }
+          LOG.debug(this.getName() + " Applied " + editsCount +
+            " total edits to " + Bytes.toStringBinary(region) +
+            " in " + (System.currentTimeMillis() - threadTime) + "ms");
+        } catch (IOException e) {
+          e = RemoteExceptionHandler.checkIOException(e);
+          LOG.fatal(this.getName() + " Got while writing log entry to log", e);
+          throw e;
+        }
+        return null;
+      }
+    };
   }
 
   /**
+   * Moves processed logs to a oldLogDir after successful processing
+   * Moves corrupted logs (any log that couldn't be successfully parsed
+   * to corruptDir (.corrupt) for later investigation
    *
-   * @param list
+   * @param corruptedLogs
+   * @param processedLogs
+   * @param oldLogDir
+   * @param fs
+   * @param conf
+   * @throws IOException
    */
+  private static void archiveLogs(final List<Path> corruptedLogs,
+    final List<Path> processedLogs, final Path oldLogDir,
+    final FileSystem fs, final Configuration conf)
+  throws IOException{
+    final Path corruptDir = new Path(conf.get(HBASE_DIR),
+      conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+
+    fs.mkdirs(corruptDir);
+    fs.mkdirs(oldLogDir);
+
+    for (Path corrupted: corruptedLogs) {
+      Path p = new Path(corruptDir, corrupted.getName());
+      LOG.info("Moving corrupted log " + corrupted + " to " + p);
+      fs.rename(corrupted, p);
+    }
+
+    for (Path p: processedLogs) {
+      Path newPath = getHLogArchivePath(oldLogDir, p);
+      fs.rename(p, newPath);
+      LOG.info("Archived processed log " + p + " to " + newPath);
+    }
+  }
+
+  private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
+    Path tableDir =
+      HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
+    Path regionDir =
+            HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+    return new Path(regionDir, HREGION_OLDLOGFILE_NAME);
+   }
+
+
+
+
+
+
+
+
   public void addLogActionsListerner(LogActionsListener list) {
     LOG.info("Adding a listener");
     this.actionListeners.add(list);
   }
 
-  /**
-   *
-   * @param list
-   */
   public boolean removeLogActionsListener(LogActionsListener list) {
     return this.actionListeners.remove(list);
   }
 
+  private static void usage() {
+    System.err.println("Usage: java org.apache.hbase.HLog" +
+        " {--dump <logfile>... | --split <logdir>...}");
+  }
+
   /**
    * Pass one or more log file names and it will either dump out a text version
    * on <code>stdout</code> or split the specified log files.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=950174&r1=950173&r2=950174&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Jun  1 18:01:53 2010
@@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.SequenceFile;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -557,4 +559,88 @@ public class FSUtils {
       return isdir;
     }
   }
+
+  /**
+   * Heuristic to determine whether is safe or not to open a file for append
+   * Looks both for dfs.support.append and use reflection to search
+   * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
+   * @param conf
+   * @return True if append support
+   */
+  public static boolean isAppendSupported(final Configuration conf) {
+    boolean append = conf.getBoolean("dfs.support.append", false);
+    if (append) {
+      try {
+        // TODO: The implementation that comes back when we do a createWriter
+        // may not be using SequenceFile so the below is not a definitive test.
+        // Will do for now (hdfs-200).
+        SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+        append = true;
+      } catch (SecurityException e) {
+      } catch (NoSuchMethodException e) {
+        append = false;
+      }
+    } else {
+      try {
+        FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
+      } catch (NoSuchMethodException e) {
+        append = false;
+      }
+    }
+    return append;
+  }
+
+
+  /*
+   * Recover file lease. Used when a file might be suspect to be had been left open by another process. <code>p</code>
+   * @param fs
+   * @param p
+   * @param append True if append supported
+   * @throws IOException
+   */
+  public static void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
+  throws IOException{
+    if (!isAppendSupported(conf)) {
+      return;
+    }
+    // lease recovery not needed for local file system case.
+    // currently, local file system doesn't implement append either.
+    if (!(fs instanceof DistributedFileSystem)) {
+      return;
+    }
+    LOG.info("Recovering file" + p);
+    long startWaiting = System.currentTimeMillis();
+
+    // Trying recovery
+    boolean recovered = false;
+    while (!recovered) {
+      try {
+        FSDataOutputStream out = fs.append(p);
+        out.close();
+        recovered = true;
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        if (e instanceof AlreadyBeingCreatedException) {
+          // We expect that we'll get this message while the lease is still
+          // within its soft limit, but if we get it past that, it means
+          // that the RS is holding onto the file even though it lost its
+          // znode. We could potentially abort after some time here.
+          long waitedFor = System.currentTimeMillis() - startWaiting;
+          if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
+            LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p +
+              ":" + e.getMessage());
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            // ignore it and try again
+          }
+        } else {
+          throw new IOException("Failed to open " + p + " for append", e);
+        }
+      }
+    }
+    LOG.info("Finished lease recover attempt for " + p);
+  }
+
 }
\ No newline at end of file

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java?rev=950174&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java Tue Jun  1 18:01:53 2010
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter {
+
+  public static boolean activateFailure = false;
+  @Override
+    public void append(HLog.Entry entry) throws IOException {
+      super.append(entry);
+      if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) {
+        System.out.println(getClass().getName() + ": I will throw an exception now...");
+        throw(new IOException("This exception is instrumented and should only be thrown for testing"));
+      }
+    }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=950174&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Tue Jun  1 18:01:53 2010
@@ -0,0 +1,735 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.naming.InsufficientResourcesException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Testing {@link HLog} splitting code.
+ */
+public class TestHLogSplit {
+
+  private Configuration conf;
+  private FileSystem fs;
+
+  private final static HBaseTestingUtility
+          TEST_UTIL = new HBaseTestingUtility();
+
+
+  private static final Path hbaseDir = new Path("/hbase");
+  private static final Path hlogDir = new Path(hbaseDir, "hlog");
+  private static final Path oldLogDir = new Path(hbaseDir, "hlog.old");
+  private static final Path corruptDir = new Path(hbaseDir, ".corrupt");
+
+  private static final int NUM_WRITERS = 10;
+  private static final int ENTRIES = 10; // entries per writer per region
+
+  private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
+  private long seq = 0;
+  private static final byte[] TABLE_NAME = "t1".getBytes();
+  private static final byte[] FAMILY = "f1".getBytes();
+  private static final byte[] QUALIFIER = "q1".getBytes();
+  private static final byte[] VALUE = "v1".getBytes();
+  private static final String HLOG_FILE_PREFIX = "hlog.dat.";
+  private static List<String> regions;
+  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
+
+
+  static enum Corruptions {
+    INSERT_GARBAGE_ON_FIRST_LINE,
+    INSERT_GARBAGE_IN_THE_MIDDLE,
+    APPEND_GARBAGE,
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().
+            setInt("hbase.regionserver.flushlogentries", 1);
+    TEST_UTIL.getConfiguration().
+            setBoolean("dfs.support.append", true);
+    TEST_UTIL.getConfiguration().
+            setStrings("hbase.rootdir", hbaseDir.toString());
+    TEST_UTIL.getConfiguration().
+            setClass("hbase.regionserver.hlog.writer.impl",
+                InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
+
+    TEST_UTIL.startMiniDFSCluster(2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries){
+      fs.delete(dir.getPath(), true);
+    }
+    seq = 0;
+    regions = new ArrayList<String>();
+    Collections.addAll(regions, "bbb", "ccc");
+    InstrumentedSequenceFileLogWriter.activateFailure = false;
+    // Set the soft lease for hdfs to be down from default of 5 minutes or so.
+    // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another.
+    // Not available in 0.20 hdfs
+    // TEST_UTIL.getDFSCluster().getNamesystem().leaseManager.
+    //  setLeasePeriod(100, 50000);
+    // Use reflection to get at the 0.20 version of above.
+    MiniDFSCluster dfsCluster = TEST_UTIL.getDFSCluster();
+    //   private NameNode nameNode;
+    Field field = dfsCluster.getClass().getDeclaredField("nameNode");
+    field.setAccessible(true);
+    NameNode nn = (NameNode)field.get(dfsCluster);
+    nn.namesystem.leaseManager.setLeasePeriod(100, 50000);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test(expected = IOException.class)
+  public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
+  throws IOException {
+    AtomicBoolean stop = new AtomicBoolean(false);
+    generateHLogs(-1);
+    fs.initialize(fs.getUri(), conf);
+    try {
+    (new ZombieNewLogWriterRegionServer(stop)).start();
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    } finally {
+      stop.set(true);
+    }
+  }
+
+  //TODO: check the edits order is respected (scenarios)
+
+  @Test
+  public void testEmptyLogFiles() throws IOException {
+
+    injectEmptyFile(".empty", true);
+    generateHLogs(Integer.MAX_VALUE);
+    injectEmptyFile("empty", true);
+
+    // make fs act as a different client now
+    // initialize will create a new DFSClient with a new client ID
+    fs.initialize(fs.getUri(), conf);
+
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+    }
+
+  }
+
+
+  @Test
+  public void testEmptyOpenLogFiles() throws IOException {
+    injectEmptyFile(".empty", false);
+    generateHLogs(Integer.MAX_VALUE);
+    injectEmptyFile("empty", false);
+
+    // make fs act as a different client now
+    // initialize will create a new DFSClient with a new client ID
+    fs.initialize(fs.getUri(), conf);
+
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+    }
+  }
+
+  @Test
+  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
+    // generate logs but leave hlog.dat.5 open.
+    generateHLogs(5);
+
+    fs.initialize(fs.getUri(), conf);
+
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+    }
+
+
+  }
+
+
+  @Test
+  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateHLogs(Integer.MAX_VALUE);
+    corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+            Corruptions.APPEND_GARBAGE, true, fs);
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+    }
+
+
+  }
+
+  @Test
+  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateHLogs(Integer.MAX_VALUE);
+    corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+            Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
+    }
+
+
+  }
+
+
+  @Test
+  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateHLogs(Integer.MAX_VALUE);
+    corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+            Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      // the entries in the original logs are alternating regions
+      // considering the sequence file header, the middle corruption should
+      // affect at least half of the entries
+      int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
+      int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
+      assertTrue("The file up to the corrupted area hasn't been parsed",
+              goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf));
+    }
+  }
+
+  @Test
+  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+
+    Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
+    Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5");
+    Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1));
+    generateHLogs(-1);
+    corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
+    corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs);
+    corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
+
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    FileStatus[] archivedLogs = fs.listStatus(corruptDir);
+
+    assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName());
+    assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName());
+    assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName());
+    assertEquals(archivedLogs.length, 3);
+
+  }
+
+  @Test
+  public void testLogsGetArchivedAfterSplit() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+    generateHLogs(-1);
+
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
+
+    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
+  }
+
+
+
+  @Test(expected = IOException.class)
+  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    generateHLogs(Integer.MAX_VALUE);
+    corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+            Corruptions.APPEND_GARBAGE, true, fs);
+
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+  }
+
+  @Test
+  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    generateHLogs(-1);
+    corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+            Corruptions.APPEND_GARBAGE, true, fs);
+    fs.initialize(fs.getUri(), conf);
+    try {
+      HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    } catch (IOException e) {/* expected */}
+
+    assertEquals("if skip.errors is false all files should remain in place",
+            NUM_WRITERS, fs.listStatus(hlogDir).length);
+  }
+
+
+  @Test
+  public void testSplit() throws IOException {
+    generateHLogs(-1);
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+
+
+    for (String region : regions) {
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+
+    }
+  }
+
+  @Test
+  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
+  throws IOException {
+    generateHLogs(-1);
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    FileStatus [] statuses = null;
+    try {
+      statuses = fs.listStatus(hlogDir);
+      assertNull(statuses);
+    } catch (FileNotFoundException e) {
+      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
+    }
+  }
+/* DISABLED for now.  TODO: HBASE-2645 
+  @Test
+  public void testLogCannotBeWrittenOnceParsed() throws IOException {
+    AtomicLong counter = new AtomicLong(0);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    generateHLogs(9);
+    fs.initialize(fs.getUri(), conf);
+
+    Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop);
+
+
+
+    try {
+      zombie.start();
+
+      HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+      Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
+
+      // It's possible that the writer got an error while appending and didn't count it
+      // however the entry will in fact be written to file and split with the rest
+      long numberOfEditsInRegion = countHLog(logfile, fs, conf);
+      assertTrue("The log file could have at most 1 extra log entry, but " +
+              "can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" "  + logfile, counter.get() == numberOfEditsInRegion ||
+                      counter.get() + 1 == numberOfEditsInRegion);
+    } finally {
+      stop.set(true);
+    }
+  }
+*/
+
+  @Test
+  public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
+  throws IOException {
+    AtomicBoolean stop = new AtomicBoolean(false);
+    generateHLogs(-1);
+    fs.initialize(fs.getUri(), conf);
+    Thread zombie = new ZombieNewLogWriterRegionServer(stop);
+    
+    try {
+      zombie.start();
+      try {
+        HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+      } catch (IOException ex) {/* expected */}
+      int logFilesNumber = fs.listStatus(hlogDir).length;
+
+      assertEquals("Log files should not be archived if there's an extra file after split",
+              NUM_WRITERS + 1, logFilesNumber);
+    } finally {
+      stop.set(true);
+    }
+
+  }
+
+
+
+  @Test(expected = IOException.class)
+  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
+    //leave 5th log open so we could append the "trap"
+    generateHLogs(4);
+
+    fs.initialize(fs.getUri(), conf);
+
+    InstrumentedSequenceFileLogWriter.activateFailure = false;
+    appendEntry(writer[4], TABLE_NAME, Bytes.toBytes("break"), ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+    writer[4].close();
+
+
+    try {
+      InstrumentedSequenceFileLogWriter.activateFailure = true;
+      HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    } catch (IOException e) {
+      assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
+      throw e;
+    } finally {
+      InstrumentedSequenceFileLogWriter.activateFailure = false;
+    }
+  }
+
+
+//  @Test
+  public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
+
+    regions.removeAll(regions);
+    for (int i=0; i<500; i++) {
+      regions.add("region__"+i);
+    }
+
+    generateHLogs(1, 100, -1);
+    fs.initialize(fs.getUri(), conf);
+
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+    fs.rename(oldLogDir, hlogDir);
+    Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
+    Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
+    fs.rename(splitPath,
+            firstSplitPath);
+
+
+    fs.initialize(fs.getUri(), conf);
+    HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+    FileStatus[] f1 = fs.listStatus(firstSplitPath);
+    FileStatus[] f2 = fs.listStatus(splitPath);
+
+    for (int i=0; i<f1.length; i++) {
+      HLog.Reader in1, in2;
+      in1 = HLog.getReader(fs, new Path(f1[i].getPath(), "oldlogfile.log"), conf);
+      in2 = HLog.getReader(fs, new Path(f2[i].getPath(), "oldlogfile.log"), conf);
+      HLog.Entry entry1;
+      HLog.Entry entry2;
+      while ((entry1 = in1.next()) != null) {
+        entry2 = in2.next();
+        assertEquals(0, entry1.getKey().compareTo(entry2.getKey()));
+        assertEquals(entry1.getEdit().toString(), entry2.getEdit().toString());
+      }
+
+    }
+  }
+
+  /**
+   * This thread will keep writing to the file after the split process has started
+   * It simulates a region server that was considered dead but woke up and wrote
+   * some more to he last log entry
+   */
+  class ZombieLastLogWriterRegionServer extends Thread {
+    AtomicLong editsCount;
+    AtomicBoolean stop;
+    Path log;
+    HLog.Writer lastLogWriter;
+    public ZombieLastLogWriterRegionServer(HLog.Writer writer, AtomicLong counter, AtomicBoolean stop) {
+      this.stop = stop;
+      this.editsCount = counter;
+      this.lastLogWriter = writer;
+    }
+
+    @Override
+    public void run() {
+      if (stop.get()){
+        return;
+      }
+      flushToConsole("starting");
+      while (true) {
+        try {
+
+          appendEntry(lastLogWriter, TABLE_NAME, "juliet".getBytes(),
+                  ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+          lastLogWriter.sync();
+          editsCount.incrementAndGet();
+          try {
+            Thread.sleep(1);
+          } catch (InterruptedException e) {
+            //
+          }
+
+
+        } catch (IOException ex) {
+          if (ex instanceof RemoteException) {
+            flushToConsole("Juliet: got RemoteException " +
+                    ex.getMessage() + " while writing " + (editsCount.get() + 1));
+            break;
+          } else {
+            assertTrue("Failed to write " + editsCount.get(), false);
+          }
+
+        }
+      }
+
+
+    }
+  }
+
+  /**
+   * This thread will keep adding new log files
+   * It simulates a region server that was considered dead but woke up and wrote
+   * some more to a new hlog
+   */
+  class ZombieNewLogWriterRegionServer extends Thread {
+    AtomicBoolean stop;
+    public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
+      super("ZombieNewLogWriterRegionServer");
+      this.stop = stop;
+    }
+
+    @Override
+    public void run() {
+      if (stop.get()) {
+        return;
+      }
+      boolean splitStarted = false;
+      Path p = new Path(hbaseDir, new String(TABLE_NAME));
+      while (!splitStarted) {
+        try {
+          FileStatus [] statuses = fs.listStatus(p);
+          // In 0.20, listStatus comes back with a null if file doesn't exit.
+          // In 0.21, it throws FNFE.
+          if (statuses != null && statuses.length > 0) {
+            // Done.
+            break;
+          }
+        } catch (FileNotFoundException e) {
+          // Expected in hadoop 0.21
+        } catch (IOException e1) {
+          assertTrue("Failed to list status ", false);
+        }
+        flushToConsole("Juliet: split not started, sleeping a bit...");
+        Threads.sleep(100);
+      }
+
+      Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
+      try {
+        HLog.Writer writer = HLog.createWriter(fs,
+                julietLog, conf);
+        appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
+                ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+        writer.close();
+        flushToConsole("Juliet file creator: created file " + julietLog);
+      } catch (IOException e1) {
+        assertTrue("Failed to create file " + julietLog, false);
+      }
+    }
+  }
+
+  private void flushToConsole(String s) {
+    System.out.println(s);
+    System.out.flush();
+  }
+
+
+  private void generateHLogs(int leaveOpen) throws IOException {
+    generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
+  }
+
+  private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
+    for (int i = 0; i < writers; i++) {
+      writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
+      for (int j = 0; j < entries; j++) {
+        int prefix = 0;
+        for (String region : regions) {
+          String row_key = region + prefix++ + i + j;
+          appendEntry(writer[i], TABLE_NAME, region.getBytes(),
+                  row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq);
+        }
+      }
+      if (i != leaveOpen) {
+        writer[i].close();
+        flushToConsole("Closing writer " + i);
+      }
+    }
+  }
+
+  private Path getLogForRegion(Path rootdir, byte[] table, String region) {
+    return new Path(HRegion.getRegionDir(HTableDescriptor
+            .getTableDir(rootdir, table),
+            HRegionInfo.encodeRegionName(region.getBytes())),
+            "oldlogfile.log");
+  }
+
+  private void corruptHLog(Path path, Corruptions corruption, boolean close,
+                           FileSystem fs) throws IOException {
+
+    FSDataOutputStream out;
+    int fileSize = (int) fs.listStatus(path)[0].getLen();
+
+    FSDataInputStream in = fs.open(path);
+    byte[] corrupted_bytes = new byte[fileSize];
+    in.readFully(0, corrupted_bytes, 0, fileSize);
+    in.close();
+
+    switch (corruption) {
+      case APPEND_GARBAGE:
+        out = fs.append(path);
+        out.write("-----".getBytes());
+        closeOrFlush(close, out);
+        break;
+
+      case INSERT_GARBAGE_ON_FIRST_LINE:
+        fs.delete(path, false);
+        out = fs.create(path);
+        out.write(0);
+        out.write(corrupted_bytes);
+        closeOrFlush(close, out);
+        break;
+
+      case INSERT_GARBAGE_IN_THE_MIDDLE:
+        fs.delete(path, false);
+        out = fs.create(path);
+        int middle = (int) Math.floor(corrupted_bytes.length / 2);
+        out.write(corrupted_bytes, 0, middle);
+        out.write(0);
+        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
+        closeOrFlush(close, out);
+        break;
+    }
+
+
+  }
+
+  private void closeOrFlush(boolean close, FSDataOutputStream out)
+  throws IOException {
+    if (close) {
+      out.close();
+    } else {
+      out.sync();
+      // Not in 0out.hflush();
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
+    HLog.Entry entry;
+    HLog.Reader in = HLog.getReader(fs, log, conf);
+    while ((entry = in.next()) != null) {
+      System.out.println(entry);
+    }
+  }
+
+  private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
+    int count = 0;
+    HLog.Reader in = HLog.getReader(fs, log, conf);
+    while (in.next() != null) {
+      count++;
+    }
+    return count;
+  }
+
+
+  public long appendEntry(HLog.Writer writer, byte[] table, byte[] region,
+                          byte[] row, byte[] family, byte[] qualifier,
+                          byte[] value, long seq)
+          throws IOException {
+
+    long time = System.nanoTime();
+    WALEdit edit = new WALEdit();
+    seq++;
+    edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
+    writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit));
+    writer.sync();
+    return seq;
+
+  }
+
+
+  private void injectEmptyFile(String suffix, boolean closeFile)
+          throws IOException {
+    HLog.Writer writer = HLog.createWriter(
+            fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
+    if (closeFile) writer.close();
+  }
+
+  @SuppressWarnings("unused")
+  private void listLogs(FileSystem fs, Path dir) throws IOException {
+    for (FileStatus file : fs.listStatus(dir)) {
+      System.out.println(file.getPath());
+    }
+
+  }
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=950174&r1=950173&r2=950174&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Tue Jun  1 18:01:53 2010
@@ -31,13 +31,13 @@ import org.apache.hadoop.hbase.HBaseClus
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -220,7 +220,7 @@ public class TestLogRolling extends HBas
     
     assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
     // don't run this test without append support (HDFS-200 & HDFS-142)
-    assertTrue("Need append support for this test", HLog.isAppend(conf));
+    assertTrue("Need append support for this test", FSUtils.isAppendSupported(conf));
 
     // add up the datanode count, to ensure proper replication when we kill 1
     dfsCluster.startDataNodes(conf, 1, true, null, null);



Mime
View raw message