hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask
Date Sat, 31 Jan 2015 03:12:51 GMT
Repository: hbase
Updated Branches:
  refs/heads/master b08802a3e -> 825871431


HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/82587143
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/82587143
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/82587143

Branch: refs/heads/master
Commit: 825871431ec48036fd3e3cd9625c451b50cbe308
Parents: b08802a
Author: stack <stack@apache.org>
Authored: Fri Jan 30 19:12:17 2015 -0800
Committer: stack <stack@apache.org>
Committed: Fri Jan 30 19:12:17 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/DistributedHBaseCluster.java   |   6 +-
 .../test/IntegrationTestBigLinkedList.java      | 416 ++++++++++++++++---
 .../hadoop/hbase/backup/HFileArchiver.java      |   4 +-
 .../hadoop/hbase/mapreduce/WALPlayer.java       |  64 +--
 .../hadoop/hbase/master/SplitLogManager.java    |   2 +-
 .../hbase/master/cleaner/CleanerChore.java      |   4 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  43 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |   1 +
 .../org/apache/hadoop/hbase/util/FSUtils.java   |   2 +-
 .../hadoop/hbase/wal/WALPrettyPrinter.java      |  27 +-
 .../apache/hadoop/hbase/wal/WALSplitter.java    |  46 +-
 hbase-server/src/test/data/0000000000000016310  | Bin 0 -> 11776703 bytes
 .../hadoop/hbase/HBaseTestingUtility.java       |   4 +-
 .../hbase/regionserver/TestRecoveredEdits.java  | 177 ++++++++
 14 files changed, 644 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
index 4a3a64a..6e7cd33 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
@@ -20,15 +20,13 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterManager.ServiceType;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
@@ -42,8 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 
-import com.google.common.collect.Sets;
-
 /**
  * Manages the interactions with an already deployed distributed cluster (as opposed to
  * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index 29bb2bb..9864031 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -21,12 +21,16 @@ package org.apache.hadoop.hbase.test;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.IntegrationTestBase;
@@ -57,9 +65,9 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -70,13 +78,15 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
@@ -89,10 +99,15 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -382,6 +397,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
         current[i] = new byte[key.getLength()];
         System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
         if (++i == current.length) {
+          LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
+            Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
+            ", i=" + i);
           persist(output, count, prev, current, id);
           i = 0;
 
@@ -473,8 +491,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
               "pre-splitting table into " + totalNumberOfRegions + " regions " +
               "(default regions per server: " + regionsPerServer + ")");
 
-          byte[][] splits = new RegionSplitter.UniformSplit().split(
-              totalNumberOfRegions);
+          byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
 
           admin.createTable(htd, splits);
         }
@@ -564,6 +581,159 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   }
 
   /**
+   * Tool to search missing rows in WALs and hfiles.
+   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
+   * (we depend on the format it writes out. We'll read them in and then search in hbase
+   * WALs and oldWALs dirs (Some of this is TODO).
+   */
+  static class Search extends Configured implements Tool {
+    private static final Log LOG = LogFactory.getLog(Search.class);
+    protected Job job;
+
+    private static void printUsage(final String error) {
+      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
+      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      if (args.length < 1 || args.length > 2) {
+        printUsage(null);
+        return 1;
+      }
+      Path inputDir = new Path(args[0]);
+      int numMappers = 1;
+      if (args.length > 1) {
+        numMappers = Integer.parseInt(args[1]);
+      }
+      return run(inputDir, numMappers);
+    }
+
+    /**
+     * WALPlayer override that searches for keys loaded in the setup.
+     */
+    public static class WALSearcher extends WALPlayer {
+      public WALSearcher(Configuration conf) {
+        super(conf);
+      }
+
+      /**
+       * The actual searcher mapper.
+       */
+      public static class WALMapperSearcher extends WALMapper {
+        private SortedSet<byte []> keysToFind;
+
+        @Override
+        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
+            throws IOException {
+          super.setup(context);
+          try {
+            this.keysToFind = readKeysToSearch(context.getConfiguration());
+            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
+          } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.toString());
+          }
+        }
+
+        @Override
+        protected boolean filter(Context context, Cell cell) {
+          // TODO: Can I do a better compare than this copying out key?
+          byte [] row = new byte [cell.getRowLength()];
+          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
+          boolean b = this.keysToFind.contains(row);
+          if (b) {
+            String keyStr = Bytes.toStringBinary(row);
+            LOG.info("Found cell=" + cell);
+            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+          }
+          return b;
+        }
+      }
+
+      // Put in place the above WALMapperSearcher.
+      @Override
+      public Job createSubmittableJob(String[] args) throws IOException {
+        Job job = super.createSubmittableJob(args);
+        // Call my class instead.
+        job.setJarByClass(WALMapperSearcher.class);
+        job.setMapperClass(WALMapperSearcher.class);
+        job.setOutputFormatClass(NullOutputFormat.class);
+        return job;
+      }
+    }
+
+    static final String FOUND_GROUP_KEY = "Found";
+    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
+
+    public int run(Path inputDir, int numMappers) throws Exception {
+      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
+      SortedSet<byte []> keys = readKeysToSearch(getConf());
+      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
+      LOG.info("Count of keys to find: " + keys.size());
+      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
+      Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+      // Now read all WALs. In two dirs. Presumes certain layout.
+      Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+      Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
+        " against " + getConf().get(HConstants.HBASE_DIR));
+      int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
+      if (ret != 0) return ret;
+      return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
+    }
+
+    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
+    throws IOException, InterruptedException {
+      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
+      FileSystem fs = FileSystem.get(conf);
+      SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+      if (!fs.exists(keysInputDir)) {
+        throw new FileNotFoundException(keysInputDir.toString());
+      }
+      if (!fs.isDirectory(keysInputDir)) {
+        throw new UnsupportedOperationException("TODO");
+      } else {
+        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
+        while(iterator.hasNext()) {
+          LocatedFileStatus keyFileStatus = iterator.next();
+          // Skip "_SUCCESS" file.
+          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
+          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
+        }
+      }
+      return result;
+    }
+
+    private static SortedSet<byte []> readFileToSearch(final Configuration conf,
+        final FileSystem fs, final LocatedFileStatus keyFileStatus)
+    throws IOException, InterruptedException {
+      SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
+      // what is missing.
+      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
+          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
+        InputSplit is =
+          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
+        rr.initialize(is, context);
+        while (rr.nextKeyValue()) {
+          rr.getCurrentKey();
+          BytesWritable bw = rr.getCurrentValue();
+          switch (Verify.VerifyReducer.whichType(bw.getBytes())) {
+          case UNDEFINED:
+            byte [] key = new byte [rr.getCurrentKey().getLength()];
+            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
+              rr.getCurrentKey().getLength());
+            result.add(key);
+            break;
+          }
+        }
+      }
+      return result;
+    }
+  }
+
+  /**
    * A Map Reduce job that verifies that the linked lists generated by
    * {@link Generator} do not have any holes.
    */
@@ -594,21 +764,84 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       }
     }
 
+    /**
+     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
+     * problems found from the reducer.
+     */
     public static enum Counts {
-      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
+      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES
     }
 
-    public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
+    /**
+     * Per reducer, we output problem rows as byte arrasy so can be used as input for
+     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
+     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
+     */
+    public static class VerifyReducer
+    extends Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable> {
       private ArrayList<byte[]> refs = new ArrayList<byte[]>();
+      private final BytesWritable UNREF =
+        new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte [] {}));
 
       private AtomicInteger rows = new AtomicInteger(0);
+      private Connection connection;
+
+      @Override
+      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
+      throws IOException, InterruptedException {
+        super.setup(context);
+        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
+      }
+
+      @Override
+      protected void cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
+      throws IOException, InterruptedException {
+        if (this.connection != null) this.connection.close();
+        super.cleanup(context);
+      }
+
+      /**
+       * @param ordinal
+       * @param r
+       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
+       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
+       */
+      public static byte [] addPrefixFlag(final int ordinal, final byte [] r) {
+        byte [] prefix = Bytes.toBytes((short)ordinal);
+        if (prefix.length != Bytes.SIZEOF_SHORT) {
+          throw new RuntimeException("Unexpected size: " + prefix.length);
+        }
+        byte [] result = new byte [prefix.length + r.length];
+        System.arraycopy(prefix, 0, result, 0, prefix.length);
+        System.arraycopy(r, 0, result, prefix.length, r.length);
+        return result;
+      }
+
+      /**
+       * @param bs
+       * @return Type from the Counts enum of this row. Reads prefix added by
+       * {@link #addPrefixFlag(int, byte[])}
+       */
+      public static Counts whichType(final byte [] bs) {
+        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
+        return Counts.values()[ordinal];
+      }
+
+      /**
+       * @param bw
+       * @return Row bytes minus the type flag.
+       */
+      public static byte [] getRowOnly(BytesWritable bw) {
+        byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
+        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
+        return bytes;
+      }
 
       @Override
       public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
-          throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
 
         int defCount = 0;
-
         refs.clear();
         for (BytesWritable type : values) {
           if (type.getLength() == DEF.getLength()) {
@@ -621,48 +854,110 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
         }
 
         // TODO check for more than one def, should not happen
-
         StringBuilder refsSb = null;
-        String keyString = null;
+        String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
         if (defCount == 0 || refs.size() != 1) {
-          refsSb = new StringBuilder();
-          String comma = "";
-          for (byte[] ref : refs) {
-            refsSb.append(comma);
-            comma = ",";
-            refsSb.append(Bytes.toStringBinary(ref));
-          }
-          keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
-
-          LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString());
+          refsSb = dumpExtraInfoOnRefs(key, context, refs);
+          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
+            (refsSb != null? refsSb.toString(): ""));
         }
 
         if (defCount == 0 && refs.size() > 0) {
-          // this is bad, found a node that is referenced but not defined. It must have been
+          // This is bad, found a node that is referenced but not defined. It must have been
           // lost, emit some info about this node for debugging purposes.
-          context.write(new Text(keyString), new Text(refsSb.toString()));
-          context.getCounter(Counts.UNDEFINED).increment(1);
+          // Write out a line per reference. If more than one, flag it.;
+          for (int i = 0; i < refs.size(); i++) {
+            byte [] bs = refs.get(i);
+            int ordinal;
+            if (i <= 0) {
+              ordinal = Counts.UNDEFINED.ordinal();
+              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
+              context.getCounter(Counts.UNDEFINED).increment(1);
+            } else {
+              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
+              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
+            }
+          }
           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+            // Print out missing row; doing get on reference gives info on when the referencer
+            // was added which can help a little debugging. This info is only available in mapper
+            // output -- the 'Linked List error Key...' log message above. What we emit here is
+            // useless for debugging.
             context.getCounter("undef", keyString).increment(1);
           }
         } else if (defCount > 0 && refs.size() == 0) {
           // node is defined but not referenced
-          context.write(new Text(keyString), new Text("none"));
+          context.write(key, UNREF);
           context.getCounter(Counts.UNREFERENCED).increment(1);
           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
             context.getCounter("unref", keyString).increment(1);
           }
         } else {
           if (refs.size() > 1) {
-            if (refsSb != null) {
-              context.write(new Text(keyString), new Text(refsSb.toString()));
+            // Skip first reference.
+            for (int i = 1; i < refs.size(); i++) {
+              context.write(key,
+                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
             }
             context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
           }
           // node is defined and referenced
           context.getCounter(Counts.REFERENCED).increment(1);
         }
+      }
 
+      /**
+       * Dump out extra info around references if there are any. Helps debugging.
+       * @return StringBuilder filled with references if any.
+       * @throws IOException
+       */
+      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
+          final List<byte []> refs)
+      throws IOException {
+        StringBuilder refsSb = null;
+        if (refs.isEmpty()) return refsSb;
+        refsSb = new StringBuilder();
+        String comma = "";
+        // If a row is a reference but has no define, print the content of the row that has
+        // this row as a 'prev'; it will help debug.  The missing row was written just before
+        // the row we are dumping out here.
+        TableName tn = getTableName(context.getConfiguration());
+        try (Table t = this.connection.getTable(tn)) {
+          for (byte [] ref : refs) {
+            Result r = t.get(new Get(ref));
+            List<Cell> cells = r.listCells();
+            String ts = (cells != null && !cells.isEmpty())?
+                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
+            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
+            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
+            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
+            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
+            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
+            String refRegionLocation = "";
+            String keyRegionLocation = "";
+            if (b != null && b.length > 0) {
+              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
+                HRegionLocation hrl = rl.getRegionLocation(b);
+                if (hrl != null) refRegionLocation = hrl.toString();
+                // Key here probably has trailing zeros on it.
+                hrl = rl.getRegionLocation(key.getBytes());
+                if (hrl != null) keyRegionLocation = hrl.toString();
+              }
+            }
+            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
+              ", refPrevEqualsKey=" +
+                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
+                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
+                ", ref row date=" + ts + ", jobStr=" + jobStr +
+                ", ref row count=" + count +
+                ", ref row regionLocation=" + refRegionLocation +
+                ", key row regionLocation=" + keyRegionLocation);
+            refsSb.append(comma);
+            comma = ",";
+            refsSb.append(Bytes.toStringBinary(ref));
+          }
+        }
+        return refsSb;
       }
     }
 
@@ -707,7 +1002,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
 
       job.setReducerClass(VerifyReducer.class);
-      job.setOutputFormatClass(TextOutputFormat.class);
+      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
+      job.setOutputKeyClass(BytesWritable.class);
+      job.setOutputValueClass(BytesWritable.class);
       TextOutputFormat.setOutputPath(job, outputDir);
 
       boolean success = job.waitForCompletion(true);
@@ -756,23 +1053,26 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
 
     protected void handleFailure(Counters counters) throws IOException {
       Configuration conf = job.getConfiguration();
-      HConnection conn = HConnectionManager.getConnection(conf);
       TableName tableName = getTableName(conf);
-      CounterGroup g = counters.getGroup("undef");
-      Iterator<Counter> it = g.iterator();
-      while (it.hasNext()) {
-        String keyString = it.next().getName();
-        byte[] key = Bytes.toBytes(keyString);
-        HRegionLocation loc = conn.relocateRegion(tableName, key);
-        LOG.error("undefined row " + keyString + ", " + loc);
-      }
-      g = counters.getGroup("unref");
-      it = g.iterator();
-      while (it.hasNext()) {
-        String keyString = it.next().getName();
-        byte[] key = Bytes.toBytes(keyString);
-        HRegionLocation loc = conn.relocateRegion(tableName, key);
-        LOG.error("unreferred row " + keyString + ", " + loc);
+      try (Connection conn = ConnectionFactory.createConnection(conf)) {
+        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
+          CounterGroup g = counters.getGroup("undef");
+          Iterator<Counter> it = g.iterator();
+          while (it.hasNext()) {
+            String keyString = it.next().getName();
+            byte[] key = Bytes.toBytes(keyString);
+            HRegionLocation loc = rl.getRegionLocation(key, true);
+            LOG.error("undefined row " + keyString + ", " + loc);
+          }
+          g = counters.getGroup("unref");
+          it = g.iterator();
+          while (it.hasNext()) {
+            String keyString = it.next().getName();
+            byte[] key = Bytes.toBytes(keyString);
+            HRegionLocation loc = rl.getRegionLocation(key, true);
+            LOG.error("unreferred row " + keyString + ", " + loc);
+          }
+        }
       }
     }
   }
@@ -944,7 +1244,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   }
 
   /**
-   * A stand alone program that follows a linked list created by {@link Generator} and prints timing info.
+   * A stand alone program that follows a linked list created by {@link Generator} and prints
+   * timing info.
    */
   private static class Walker extends Configured implements Tool {
     @Override
@@ -1048,7 +1349,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   }
 
   private static class Clean extends Configured implements Tool {
-
     @Override public int run(String[] args) throws Exception {
       if (args.length < 1) {
         System.err.println("Usage: Clean <output dir>");
@@ -1136,16 +1436,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
 
   private void printCommands() {
     System.err.println("Commands:");
-    System.err.println(" Generator  Map only job that generates data.");
-    System.err.println(" Verify     A map reduce job that looks for holes. Look at the counts ");
+    System.err.println(" generator  Map only job that generates data.");
+    System.err.println(" verify     A map reduce job that looks for holes. Look at the counts ");
     System.err.println("            after running. See REFERENCED and UNREFERENCED are ok. Any ");
     System.err.println("            UNDEFINED counts are bad. Do not run with the Generator.");
-    System.err.println(" Walker     " +
-      "Standalong program that starts following a linked list & emits timing info.");
-    System.err.println(" Print      Standalone program that prints nodes in the linked list.");
-    System.err.println(" Delete     Standalone program that deletes a┬Ěsingle node.");
-    System.err.println(" Loop       Program to Loop through Generator and Verify steps");
-    System.err.println(" Clean      Program to clean all left over detritus.");
+    System.err.println(" walker     " +
+      "Standalone program that starts following a linked list & emits timing info.");
+    System.err.println(" print      Standalone program that prints nodes in the linked list.");
+    System.err.println(" delete     Standalone program that deletes a┬Ěsingle node.");
+    System.err.println(" loop       Program to Loop through Generator and Verify steps");
+    System.err.println(" clean      Program to clean all left over detritus.");
+    System.err.println(" search     Search for missing keys.");
     System.err.flush();
   }
 
@@ -1158,6 +1459,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       printUsage(this.getClass().getSimpleName() +
         " <general options> COMMAND [<COMMAND options>]", "General options:", "");
       printCommands();
+      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
       throw new RuntimeException("Incorrect Number of args.");
     }
     toRun = args[0];
@@ -1168,7 +1470,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
   public int runTestFromCommandLine() throws Exception {
 
     Tool tool = null;
-    if (toRun.equals("Generator")) {
+    if (toRun.equalsIgnoreCase("Generator")) {
       tool = new Generator();
     } else if (toRun.equalsIgnoreCase("Verify")) {
       tool = new Verify();
@@ -1184,6 +1486,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
       tool = new Delete();
     } else if (toRun.equalsIgnoreCase("Clean")) {
       tool = new Clean();
+    } else if (toRun.equalsIgnoreCase("Search")) {
+      tool = new Search();
     } else {
       usage();
       throw new RuntimeException("Unknown arg");
@@ -1227,4 +1531,4 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
     System.exit(ret);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
index a04cb88..d682ccc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
@@ -221,9 +221,9 @@ public class HFileArchiver {
     }
 
     // otherwise we attempt to archive the store files
-    if (LOG.isTraceEnabled()) LOG.trace("Archiving compacted store files.");
+    if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");
 
-    // wrap the storefile into a File
+    // Wrap the storefile into a File
     StoreToFile getStorePath = new StoreToFile(fs);
     Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 67d9b0d..c20e375 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -128,14 +128,12 @@ public class WALPlayer extends Configured implements Tool {
    * A mapper that writes out {@link Mutation} to be directly applied to
    * a running HBase instance.
    */
-  static class WALMapper
+  protected static class WALMapper
   extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
-    private Map<TableName, TableName> tables =
-        new TreeMap<TableName, TableName>();
+    private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
 
     @Override
-    public void map(WALKey key, WALEdit value,
-      Context context)
+    public void map(WALKey key, WALEdit value, Context context)
     throws IOException {
       try {
         if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
@@ -150,27 +148,29 @@ public class WALPlayer extends Configured implements Tool {
             // filtering WAL meta entries
             if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
 
-            // A WALEdit may contain multiple operations (HBASE-3584) and/or
-            // multiple rows (HBASE-5229).
-            // Aggregate as much as possible into a single Put/Delete
-            // operation before writing to the context.
-            if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
-                || !CellUtil.matchingRow(lastCell, cell)) {
-              // row or type changed, write out aggregate KVs.
-              if (put != null) context.write(tableOut, put);
-              if (del != null) context.write(tableOut, del);
-
+            // Allow a subclass filter out this cell.
+            if (filter(context, cell)) {
+              // A WALEdit may contain multiple operations (HBASE-3584) and/or
+              // multiple rows (HBASE-5229).
+              // Aggregate as much as possible into a single Put/Delete
+              // operation before writing to the context.
+              if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
+                  || !CellUtil.matchingRow(lastCell, cell)) {
+                // row or type changed, write out aggregate KVs.
+                if (put != null) context.write(tableOut, put);
+                if (del != null) context.write(tableOut, del);
+                if (CellUtil.isDelete(cell)) {
+                  del = new Delete(cell.getRow());
+                } else {
+                  put = new Put(cell.getRow());
+                }
+              }
               if (CellUtil.isDelete(cell)) {
-                del = new Delete(cell.getRow());
+                del.addDeleteMarker(cell);
               } else {
-                put = new Put(cell.getRow());
+                put.add(cell);
               }
             }
-            if (CellUtil.isDelete(cell)) {
-              del.addDeleteMarker(cell);
-            } else {
-              put.add(cell);
-            }
             lastCell = cell;
           }
           // write residual KVs
@@ -182,18 +182,30 @@ public class WALPlayer extends Configured implements Tool {
       }
     }
 
+    /**
+     * @param cell
+     * @return Return true if we are to emit this cell.
+     */
+    protected boolean filter(Context context, final Cell cell) {
+      return true;
+    }
+
     @Override
     public void setup(Context context) throws IOException {
       String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
       String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
-      if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
+      if (tablesToUse == null && tableMap == null) {
+        // Then user wants all tables.
+      } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
         // this can only happen when WALMapper is used directly by a class other than WALPlayer
         throw new IOException("No tables or incorrect table mapping specified.");
       }
       int i = 0;
-      for (String table : tablesToUse) {
-        tables.put(TableName.valueOf(table),
+      if (tablesToUse != null) {
+        for (String table : tablesToUse) {
+          tables.put(TableName.valueOf(table),
             TableName.valueOf(tableMap[i++]));
+        }
       }
     }
   }
@@ -337,4 +349,4 @@ public class WALPlayer extends Configured implements Tool {
     Job job = createSubmittableJob(otherArgs);
     return job.waitForCompletion(true) ? 0 : 1;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 8a7a362..9ac2084 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -167,7 +167,7 @@ public class SplitLogManager {
 
   /**
    * Get a list of paths that need to be split given a set of server-specific directories and
-   * optinally  a filter.
+   * optionally  a filter.
    *
    * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
    * layout.

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index 05a5a9e..5a93a6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -251,8 +251,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
     int deletedFileCount = 0;
     for (FileStatus file : filesToDelete) {
       Path filePath = file.getPath();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Removing: " + filePath + " from archive");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Removing: " + filePath + " from archive");
       }
       try {
         boolean success = this.fs.delete(filePath, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 26f8943..c83841a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3666,11 +3666,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       internalFlushcache(null, seqid, stores.values(), status);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
-    for (Path file: files) {
-      if (!fs.delete(file, false)) {
-        LOG.error("Failed delete of " + file);
-      } else {
-        LOG.debug("Deleted recovered.edits file=" + file);
+    if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
+      // For debugging data loss issues!
+      // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
+      // column family. Have to fake out file type too by casting our recovered.edits as storefiles
+      String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
+      Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
+      for (Path file: files) {
+        fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
+          null, null));
+      }
+      getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
+    } else {
+      for (Path file: files) {
+        if (!fs.delete(file, false)) {
+          LOG.error("Failed delete of " + file);
+        } else {
+          LOG.debug("Deleted recovered.edits file=" + file);
+        }
       }
     }
     return seqid;
@@ -3710,8 +3723,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
       try {
         // How many edits seen before we check elapsed time
-        int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
-            2000);
+        int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
         // How often to send a progress report (default 1/2 master timeout)
         int period = this.conf.getInt("hbase.hstore.report.period", 300000);
         long lastReport = EnvironmentEdgeManager.currentTime();
@@ -3770,21 +3782,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
               continue;
             }
           }
+          // Check this edit is for this region.
+          if (!Bytes.equals(key.getEncodedRegionName(),
+              this.getRegionInfo().getEncodedNameAsBytes())) {
+            skippedEdits++;
+            continue;
+          }
 
           boolean flush = false;
           for (Cell cell: val.getCells()) {
             // Check this edit is for me. Also, guard against writing the special
             // METACOLUMN info such as HBASE::CACHEFLUSH entries
-            if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
-                !Bytes.equals(key.getEncodedRegionName(),
-                  this.getRegionInfo().getEncodedNameAsBytes())) {
+            if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
               //this is a special edit, we should handle it
               CompactionDescriptor compaction = WALEdit.getCompaction(cell);
               if (compaction != null) {
                 //replay the compaction
                 completeCompactionMarker(compaction);
               }
-
               skippedEdits++;
               continue;
             }
@@ -3810,10 +3825,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Once we are over the limit, restoreEdit will keep returning true to
             // flush -- but don't flush until we've played all the kvs that make up
             // the WALEdit.
-            if (!flush) {
-              flush = restoreEdit(store, cell);
-            }
-
+            flush |= restoreEdit(store, cell);
             editsCount++;
           }
           if (flush) {
@@ -5040,6 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * @return qualified path of region directory
    */
   @Deprecated
+  @VisibleForTesting
   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
     return new Path(
       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 19b4719..0211a17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1018,6 +1018,7 @@ public class FSHLog implements WAL {
         i.preLogArchive(p, newPath);
       }
     }
+    LOG.info("Archiving " + p + " to " + newPath);
     if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
       throw new IOException("Unable to rename " + p + " to " + newPath);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index f5b0269..0d0912e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1204,7 +1204,7 @@ public abstract class FSUtils {
     private List<String> blacklist;
 
     /**
-     * Create a filter on the give filesystem with the specified blacklist
+     * Create a filter on the givem filesystem with the specified blacklist
      * @param fs filesystem to filter
      * @param directoryNameBlackList list of the names of the directories to filter. If
      *          <tt>null</tt>, all directories are returned

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
index 104faad..720cedc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -75,7 +75,7 @@ public class WALPrettyPrinter {
   // enable in order to output a single list of transactions from several files
   private boolean persistentOutput;
   private boolean firstTxn;
-  // useful for programatic capture of JSON output
+  // useful for programmatic capture of JSON output
   private PrintStream out;
   // for JSON encoding
   private static final ObjectMapper MAPPER = new ObjectMapper();
@@ -267,8 +267,9 @@ public class WALPrettyPrinter {
           Map<String, Object> op = new HashMap<String, Object>(toStringMap(cell));
           if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue()));
           // check row output filter
-          if (row == null || ((String) op.get("row")).equals(row))
+          if (row == null || ((String) op.get("row")).equals(row)) {
             actions.add(op);
+          }
         }
         if (actions.size() == 0)
           continue;
@@ -283,22 +284,16 @@ public class WALPrettyPrinter {
           out.print(MAPPER.writeValueAsString(txn));
         } else {
           // Pretty output, complete with indentation by atomic action
-          out.println("Sequence " + txn.get("sequence") + " "
-              + "from region " + txn.get("region") + " " + "in table "
-              + txn.get("table") + " at write timestamp: " + new Date(writeTime));
+          out.println("Sequence=" + txn.get("sequence") + " "
+              + ", region=" + txn.get("region") + " at write timestamp=" + new Date(writeTime));
           for (int i = 0; i < actions.size(); i++) {
             Map op = actions.get(i);
-            out.println("  Action:");
-            out.println("    row: " + op.get("row"));
-            out.println("    column: " + op.get("family") + ":"
-                + op.get("qualifier"));
-            out.println("    timestamp: "
-                + (new Date((Long) op.get("timestamp"))));
-            if(op.get("tag") != null) {
+            out.println("row=" + op.get("row") +
+                ", column=" + op.get("family") + ":" + op.get("qualifier"));
+            if (op.get("tag") != null) {
               out.println("    tag: " + op.get("tag"));
             }
-            if (outputValues)
-              out.println("    value: " + op.get("value"));
+            if (outputValues) out.println("    value: " + op.get("value"));
           }
         }
       }
@@ -347,8 +342,6 @@ public class WALPrettyPrinter {
    *          Command line arguments
    * @throws IOException
    *           Thrown upon file system errors etc.
-   * @throws ParseException
-   *           Thrown if command-line parsing fails.
    */
   public static void run(String[] args) throws IOException {
     // create options
@@ -364,7 +357,7 @@ public class WALPrettyPrinter {
 
     WALPrettyPrinter printer = new WALPrettyPrinter();
     CommandLineParser parser = new PosixParser();
-    List files = null;
+    List<?> files = null;
     try {
       CommandLine cmd = parser.parse(options, args);
       files = cmd.getArgList();

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 2ddc9d1..c187af1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -49,13 +49,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
@@ -73,9 +68,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagRewriteCell;
-import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -83,9 +76,9 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
-import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.master.SplitLogManager;
@@ -105,9 +98,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -115,17 +111,17 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.ipc.RemoteException;
 
-// imports for things that haven't moved from regionserver.wal yet.
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
 
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
@@ -269,8 +265,7 @@ public class WALSplitter {
    * log splitting implementation, splits one log file.
    * @param logfile should be an actual log file.
    */
-  boolean splitLogFile(FileStatus logfile,
-      CancelableProgressable reporter) throws IOException {
+  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
     Preconditions.checkState(status == null);
     Preconditions.checkArgument(logfile.isFile(),
         "passed in file status is for something other than a regular file.");
@@ -399,8 +394,9 @@ public class WALSplitter {
       } finally {
         String msg =
             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
-                + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
-                + " progress failed = " + progress_failed;
+                + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
+                ", length=" + logfile.getLen() + // See if length got updated post lease recovery
+                ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
         LOG.info(msg);
         status.markComplete(msg);
       }
@@ -714,8 +710,8 @@ public class WALSplitter {
           throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
         }
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Written region seqId to file:" + newSeqIdFile + " ,newSeqId=" + newSeqId
-              + " ,maxSeqId=" + maxSeqId);
+          LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
+              + ", maxSeqId=" + maxSeqId);
         }
       } catch (FileAlreadyExistsException ignored) {
         // latest hdfs throws this exception. it's all right if newSeqIdFile already exists

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/test/data/0000000000000016310
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/data/0000000000000016310 b/hbase-server/src/test/data/0000000000000016310
new file mode 100644
index 0000000..8e58c98
Binary files /dev/null and b/hbase-server/src/test/data/0000000000000016310 differ

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index f981185..8613276 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -2526,14 +2526,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * Stops the previously started <code>MiniMRCluster</code>.
    */
   public void shutdownMiniMapReduceCluster() {
-    LOG.info("Stopping mini mapreduce cluster...");
     if (mrCluster != null) {
+      LOG.info("Stopping mini mapreduce cluster...");
       mrCluster.shutdown();
       mrCluster = null;
+      LOG.info("Mini mapreduce cluster stopped");
     }
     // Restore configuration to point to local jobtracker
     conf.set("mapreduce.jobtracker.address", "local");
-    LOG.info("Mini mapreduce cluster stopped");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/82587143/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
new file mode 100644
index 0000000..3d651ef
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mortbay.log.Log;
+
+/**
+ * Tests around replay of recovered.edits content.
+ */
+@Category({MediumTests.class})
+public class TestRecoveredEdits {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  @Rule public TestName testName = new TestName();
+
+  /**
+   * HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask.
+   * Create a region. Close it. Then copy into place a file to replay, one that is bigger than
+   * configured flush size so we bring on lots of flushes.  Then reopen and confirm all edits
+   * made it in.
+   * @throws IOException
+   */
+  @Test (timeout=30000)
+  public void testReplayWorksThoughLotsOfFlushing() throws IOException {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    // Set it so we flush every 1M or so.  Thats a lot.
+    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
+    // The file of recovered edits has a column family of 'meta'. Also has an encoded regionname
+    // of 4823016d8fca70b25503ee07f4c6d79f which needs to match on replay.
+    final String encodedRegionName = "4823016d8fca70b25503ee07f4c6d79f";
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(testName.getMethodName()));
+    final String columnFamily = "meta";
+    byte [][] columnFamilyAsByteArray = new byte [][] {Bytes.toBytes(columnFamily)};
+    htd.addFamily(new HColumnDescriptor(columnFamily));
+    HRegionInfo hri = new HRegionInfo(htd.getTableName()) {
+      @Override
+      public synchronized String getEncodedName() {
+        return encodedRegionName;
+      }
+
+      // Cache the name because lots of lookups.
+      private byte [] encodedRegionNameAsBytes = null;
+      @Override
+      public synchronized byte[] getEncodedNameAsBytes() {
+        if (encodedRegionNameAsBytes == null) {
+          this.encodedRegionNameAsBytes = Bytes.toBytes(getEncodedName());
+        }
+        return this.encodedRegionNameAsBytes;
+      }
+    };
+    Path hbaseRootDir = TEST_UTIL.getDataTestDir();
+    HRegion region = HRegion.createHRegion(hri, hbaseRootDir, conf, htd, null);
+    assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
+    List<String> storeFiles = region.getStoreFileList(columnFamilyAsByteArray);
+    // There should be no store files.
+    assertTrue(storeFiles.isEmpty());
+    region.close();
+    Path regionDir = region.getRegionDir(hbaseRootDir, hri);
+    Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
+    // This is a little fragile getting this path to a file of 10M of edits.
+    Path recoveredEditsFile = new Path(new Path(
+      System.getProperty("project.build.testSourceDirectory", "src" + Path.SEPARATOR + "test"),
+      "data"), "0000000000000016310");
+    // Copy this file under the region's recovered.edits dir so it is replayed on reopen.
+    FileSystem fs = FileSystem.get(conf);
+    Path destination = new Path(recoveredEditsDir, recoveredEditsFile.getName());
+    fs.copyToLocalFile(recoveredEditsFile, destination);
+    assertTrue(fs.exists(destination));
+    // Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay.
+    region = HRegion.openHRegion(region, null);
+    assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
+    storeFiles = region.getStoreFileList(columnFamilyAsByteArray);
+    // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if
+    // we flush at 1MB, that there are at least 3 flushed files that are there because of the
+    // replay of edits.
+    assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);
+    // Now verify all edits made it into the region.
+    int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region);
+    Log.info("Checked " + count + " edits made it in");
+  }
+
+  /**
+   * @param fs
+   * @param conf
+   * @param edits
+   * @param region
+   * @return Return how many edits seen.
+   * @throws IOException
+   */
+  private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf,
+      final Path edits, final HRegion region)
+  throws IOException {
+    int count = 0;
+    // Based on HRegion#replayRecoveredEdits
+    WAL.Reader reader = null;
+    try {
+      reader = WALFactory.createReader(fs, edits, conf);
+      WAL.Entry entry;
+      while ((entry = reader.next()) != null) {
+        WALKey key = entry.getKey();
+        WALEdit val = entry.getEdit();
+        count++;
+        // Check this edit is for this region.
+        if (!Bytes.equals(key.getEncodedRegionName(),
+            region.getRegionInfo().getEncodedNameAsBytes())) {
+          continue;
+        }
+        Cell previous = null;
+        for (Cell cell: val.getCells()) {
+          if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
+          if (previous != null && CellComparator.compareRows(previous, cell) == 0) continue;
+          previous = cell;
+          Get g = new Get(CellUtil.cloneRow(cell));
+          Result r = region.get(g);
+          boolean found = false;
+          for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
+            Cell current = scanner.current();
+            if (CellComparator.compare(cell, current, true) == 0) {
+              found = true;
+              break;
+            }
+          }
+          assertTrue("Failed to find " + cell, found);
+        }
+      }
+    } finally {
+      if (reader != null) reader.close();
+    }
+    return count;
+  }
+}
\ No newline at end of file


Mime
View raw message