hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject hbase git commit: HBASE-15984 Handle premature EOF treatment of WALs in replication.
Date Thu, 29 Sep 2016 19:54:59 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 e0066e713 -> 39a79d50f


HBASE-15984 Handle premature EOF treatment of WALs in replication.

In some particular deployments, the Replication code believes it has
reached EOF for a WAL prior to succesfully parsing all bytes known to
exist in a cleanly closed file.

Consistently this failure happens due to an InvalidProtobufException
after some number of seeks during our attempts to tail the in-progress
RegionServer WAL. As a work-around, this patch treats cleanly closed
files differently than other execution paths. If an EOF is detected due
to parsing or other errors while there are still unparsed bytes before
the end-of-file trailer, we now reset the WAL to the very beginning and
attempt a clean read-through.

In current testing, a single such reset is sufficient to work around
observed dataloss. However, the above change will retry a given WAL file
indefinitely. On each such attempt, a log message like the below will
be emitted at the WARN level:

  Processing end of WAL file '{}'. At position {}, which is too far away
  from reported file length {}. Restarting WAL reading (see HBASE-15983
  for details).

Additionally, this patch adds some additional log detail at the TRACE
level about file offsets seen while handling recoverable errors. It also
add metrics that measure the use of this recovery mechanism.

 Conflicts:
	hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
	hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39a79d50
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39a79d50
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39a79d50

Branch: refs/heads/branch-1.3
Commit: 39a79d50f1bde8ec54e08e7c249ba07562a30f63
Parents: e0066e7
Author: Sean Busbey <busbey@apache.org>
Authored: Tue Jun 7 16:00:46 2016 -0500
Committer: Sean Busbey <busbey@apache.org>
Committed: Thu Sep 29 13:01:43 2016 -0500

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         | 17 ++++
 .../MetricsReplicationGlobalSourceSource.java   | 45 +++++++++++
 .../MetricsReplicationSourceSourceImpl.java     | 81 ++++++++++++++++++++
 .../regionserver/wal/ProtobufLogReader.java     | 45 +++++++++--
 .../replication/regionserver/MetricsSource.java | 35 +++++++++
 .../regionserver/ReplicationSource.java         | 39 ++++++++--
 .../ReplicationWALReaderManager.java            | 10 +++
 src/main/asciidoc/_chapters/ops_mgt.adoc        | 24 +++++-
 8 files changed, 280 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..1ed5a6b 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -38,6 +38,16 @@ public interface MetricsReplicationSourceSource {
   public static final String SOURCE_SHIPPED_HFILES = "source.shippedHFiles";
   public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
 
+  public static final String SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH =
+      "source.closedLogsWithUnknownFileLength";
+  public static final String SOURCE_UNCLEANLY_CLOSED_LOGS = "source.uncleanlyClosedLogs";
+  public static final String SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES =
+      "source.ignoredUncleanlyClosedLogContentsInBytes";
+  public static final String SOURCE_RESTARTED_LOG_READING = "source.restartedLogReading";
+  public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
+  public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
+  public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
+
   void setLastShippedAge(long age);
   void incrSizeOfLogQueue(int size);
   void decrSizeOfLogQueue(int size);
@@ -53,4 +63,11 @@ public interface MetricsReplicationSourceSource {
   void incrHFilesShipped(long hfiles);
   void incrSizeOfHFileRefsQueue(long size);
   void decrSizeOfHFileRefsQueue(long size);
+  void incrUnknownFileLengthForClosedWAL();
+  void incrUncleanlyClosedWALs();
+  void incrBytesSkippedInUncleanlyClosedWALs(final long bytes);
+  void incrRestartedWALReading();
+  void incrRepeatedFileBytes(final long bytes);
+  void incrCompletedWAL();
+  void incrCompletedRecoveryQueue();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..367c49b 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -36,6 +36,13 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableFastCounter logReadInBytesCounter;
   private final MutableFastCounter shippedHFilesCounter;
   private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
+  private final MutableFastCounter unknownFileLengthForClosedWAL;
+  private final MutableFastCounter uncleanlyClosedWAL;
+  private final MutableFastCounter uncleanlyClosedSkippedBytes;
+  private final MutableFastCounter restartWALReading;
+  private final MutableFastCounter repeatedFileBytes;
+  private final MutableFastCounter completedWAL;
+  private final MutableFastCounter completedRecoveryQueue;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
@@ -62,6 +69,14 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
 
     sizeOfHFileRefsQueueGauge =
         rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
+
+    unknownFileLengthForClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH,
0L);
+    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS,
0L);
+    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES,
0L);
+    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING,
0L);
+    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES,
0L);
+    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
+    completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES,
0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -143,4 +158,34 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+  @Override
+  public void incrUnknownFileLengthForClosedWAL() {
+    unknownFileLengthForClosedWAL.incr(1L);
+  }
+  @Override
+  public void incrUncleanlyClosedWALs() {
+    uncleanlyClosedWAL.incr(1L);
+  }
+  @Override
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    uncleanlyClosedSkippedBytes.incr(bytes);
+  }
+  @Override
+  public void incrRestartedWALReading() {
+    restartWALReading.incr(1L);
+  }
+  @Override
+  public void incrRepeatedFileBytes(final long bytes) {
+    repeatedFileBytes.incr(bytes);
+  }
+  @Override
+  public void incrCompletedWAL() {
+    completedWAL.incr(1L);
+  }
+  @Override
+  public void incrCompletedRecoveryQueue() {
+    completedRecoveryQueue.incr(1L);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..f3e6533 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -24,6 +24,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
   private final MetricsReplicationSourceImpl rms;
   private final String id;
+  private final String keyPrefix;
   private final String sizeOfLogQueueKey;
   private final String ageOfLastShippedOpKey;
   private final String logReadInEditsKey;
@@ -49,9 +50,25 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableFastCounter shippedHFilesCounter;
   private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
 
+  private final String unknownFileLengthKey;
+  private final String uncleanlyClosedKey;
+  private final String uncleanlySkippedBytesKey;
+  private final String restartedKey;
+  private final String repeatedBytesKey;
+  private final String completedLogsKey;
+  private final String completedRecoveryKey;
+  private final MutableFastCounter unknownFileLengthForClosedWAL;
+  private final MutableFastCounter uncleanlyClosedWAL;
+  private final MutableFastCounter uncleanlyClosedSkippedBytes;
+  private final MutableFastCounter restartWALReading;
+  private final MutableFastCounter repeatedFileBytes;
+  private final MutableFastCounter completedWAL;
+  private final MutableFastCounter completedRecoveryQueue;
+
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id)
{
     this.rms = rms;
     this.id = id;
+    this.keyPrefix = "source." + this.id + ".";
 
     ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
@@ -85,6 +102,27 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
     sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey,
0L);
+
+    unknownFileLengthKey = this.keyPrefix + "closedLogsWithUnknownFileLength";
+    unknownFileLengthForClosedWAL = rms.getMetricsRegistry().getCounter(unknownFileLengthKey,
0L);
+
+    uncleanlyClosedKey = this.keyPrefix + "uncleanlyClosedLogs";
+    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(uncleanlyClosedKey, 0L);
+
+    uncleanlySkippedBytesKey = this.keyPrefix + "ignoredUncleanlyClosedLogContentsInBytes";
+    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry().getCounter(uncleanlySkippedBytesKey,
0L);
+
+    restartedKey = this.keyPrefix + "restartedLogReading";
+    restartWALReading = rms.getMetricsRegistry().getCounter(restartedKey, 0L);
+
+    repeatedBytesKey = this.keyPrefix + "repeatedLogFileBytes";
+    repeatedFileBytes = rms.getMetricsRegistry().getCounter(repeatedBytesKey, 0L);
+
+    completedLogsKey = this.keyPrefix + "completedLogs";
+    completedWAL = rms.getMetricsRegistry().getCounter(completedLogsKey, 0L);
+
+    completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
+    completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -142,6 +180,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     rms.removeMetric(shippedHFilesKey);
     rms.removeMetric(sizeOfHFileRefsQueueKey);
+
+    rms.removeMetric(unknownFileLengthKey);
+    rms.removeMetric(uncleanlyClosedKey);
+    rms.removeMetric(uncleanlySkippedBytesKey);
+    rms.removeMetric(restartedKey);
+    rms.removeMetric(repeatedBytesKey);
+    rms.removeMetric(completedLogsKey);
+    rms.removeMetric(completedRecoveryKey);
   }
 
   @Override
@@ -168,4 +214,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+  @Override
+  public void incrUnknownFileLengthForClosedWAL() {
+    unknownFileLengthForClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrUncleanlyClosedWALs() {
+    uncleanlyClosedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    uncleanlyClosedSkippedBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrRestartedWALReading() {
+    restartWALReading.incr(1L);
+  }
+
+  @Override
+  public void incrRepeatedFileBytes(final long bytes) {
+    repeatedFileBytes.incr(bytes);
+  }
+
+  @Override
+  public void incrCompletedWAL() {
+    completedWAL.incr(1L);
+  }
+
+  @Override
+  public void incrCompletedRecoveryQueue() {
+    completedRecoveryQueue.incr(1L);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index bb25aa1..9fd171f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -96,6 +96,21 @@ public class ProtobufLogReader extends ReaderBase {
   // cell codec classname
   private String codecClsName = null;
 
+  @InterfaceAudience.Private
+  public long trailerSize() {
+    if (trailerPresent) {
+      // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
+      final long calculatedSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
+      final long expectedSize = fileLength - walEditsStopOffset;
+      if (expectedSize != calculatedSize) {
+        LOG.warn("After parsing the trailer, we expect the total footer to be "+ expectedSize
+" bytes, but we calculate it as being " + calculatedSize);
+      }
+      return expectedSize;
+    } else {
+      return -1L;
+    }
+  }
+
   enum WALHdrResult {
     EOF,                   // stream is at EOF when method starts
     SUCCESS,
@@ -216,7 +231,7 @@ public class ProtobufLogReader extends ReaderBase {
     this.seekOnFs(currentPosition);
     if (LOG.isTraceEnabled()) {
       LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
-          + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
+          + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent
? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
     }
     
     codecClsName = hdrCtxt.getCellCodecClsName();
@@ -312,6 +327,9 @@ public class ProtobufLogReader extends ReaderBase {
       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
       long originalPosition = this.inputStream.getPos();
       if (trailerPresent && originalPosition > 0 && originalPosition ==
this.walEditsStopOffset) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Reached end of expected edits area at offset " + originalPosition);
+        }
         return false;
       }
       WALKey.Builder builder = WALKey.newBuilder();
@@ -321,7 +339,7 @@ public class ProtobufLogReader extends ReaderBase {
         try {
           int firstByte = this.inputStream.read();
           if (firstByte == -1) {
-            throw new EOFException("First byte is negative");
+            throw new EOFException("First byte is negative at offset " + originalPosition);
           }
           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
           // available may be < 0 on local fs for instance.  If so, can't depend on it.
@@ -329,7 +347,7 @@ public class ProtobufLogReader extends ReaderBase {
           if (available > 0 && available < size) {
             throw new EOFException("Available stream not enough for edit, " +
                 "inputStream.available()= " + this.inputStream.available() + ", " +
-                "entry size= " + size);
+                "entry size= " + size + " at offset = " + this.inputStream.getPos());
           }
           ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
             (int)size);
@@ -342,12 +360,14 @@ public class ProtobufLogReader extends ReaderBase {
           // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
           //       If we can get the KV count, we could, theoretically, try to get next record.
           throw new EOFException("Partial PB while reading WAL, " +
-              "probably an unexpected EOF, ignoring");
+              "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
         }
         WALKey walKey = builder.build();
         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
         if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
-          LOG.trace("WALKey has no KVs that follow it; trying the next one");
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset="
+ this.inputStream.getPos());
+          }
           continue;
         }
         int expectedCells = walKey.getFollowingKvCount();
@@ -362,7 +382,9 @@ public class ProtobufLogReader extends ReaderBase {
           try {
             posAfterStr = this.inputStream.getPos() + "";
           } catch (Throwable t) {
-            LOG.trace("Error getting pos for error message - ignoring", t);
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Error getting pos for error message - ignoring", t);
+            }
           }
           String message = " while reading " + expectedCells + " WAL KVs; started reading
at "
               + posBefore + " and read up to " + posAfterStr;
@@ -377,11 +399,18 @@ public class ProtobufLogReader extends ReaderBase {
           throw new EOFException("Read WALTrailer while reading WALEdits");
         }
       } catch (EOFException eof) {
-        LOG.trace("Encountered a malformed edit, seeking back to last good position in file",
eof);
         // If originalPosition is < 0, it is rubbish and we cannot use it (probably local
fs)
-        if (originalPosition < 0) throw eof;
+        if (originalPosition < 0) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Encountered a malformed edit, but can't seek back to last good position
because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);
+          }
+          throw eof;
+        }
         // Else restore our position to original location in hope that next time through
we will
         // read successfully.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Encountered a malformed edit, seeking back to last good position in
file, from "+ inputStream.getPos()+" to " + originalPosition, eof);
+        }
         seekOnFs(originalPosition);
         return false;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..2256cd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -227,4 +227,39 @@ public class MetricsSource {
       lastHFileRefsQueueSize = 0;
     }
   }
+
+  public void incrUnknownFileLengthForClosedWAL() {
+    singleSourceSource.incrUnknownFileLengthForClosedWAL();
+    globalSourceSource.incrUnknownFileLengthForClosedWAL();
+  }
+
+  public void incrUncleanlyClosedWALs() {
+    singleSourceSource.incrUncleanlyClosedWALs();
+    globalSourceSource.incrUncleanlyClosedWALs();
+  }
+
+  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
+    singleSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
+    globalSourceSource.incrBytesSkippedInUncleanlyClosedWALs(bytes);
+  }
+
+  public void incrRestartedWALReading() {
+    singleSourceSource.incrRestartedWALReading();
+    globalSourceSource.incrRestartedWALReading();
+  }
+
+  public void incrRepeatedFileBytes(final long bytes) {
+    singleSourceSource.incrRepeatedFileBytes(bytes);
+    globalSourceSource.incrRepeatedFileBytes(bytes);
+  }
+
+  public void incrCompletedWAL() {
+    singleSourceSource.incrCompletedWAL();
+    globalSourceSource.incrCompletedWAL();
+  }
+
+  public void incrCompletedRecoveryQueue() {
+    singleSourceSource.incrCompletedRecoveryQueue();
+    globalSourceSource.incrCompletedRecoveryQueue();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 387427e..17bf6e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1087,24 +1087,49 @@ public class ReplicationSource extends Thread
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
         justification = "Yeah, this is how it works")
     protected boolean processEndOfFile() {
+      // We presume this means the file we're reading is closed.
       if (this.queue.size() != 0) {
-        if (LOG.isTraceEnabled()) {
-          String filesize = "N/A";
-          try {
-            FileStatus stat = fs.getFileStatus(this.currentPath);
-            filesize = stat.getLen() + "";
-          } catch (IOException ex) {
+        // -1 means the wal wasn't closed cleanly.
+        final long trailerSize = this.repLogReader.currentTrailerSize();
+        final long currentPosition = this.repLogReader.getPosition();
+        FileStatus stat = null;
+        try {
+          stat = fs.getFileStatus(this.currentPath);
+        } catch (IOException exception) {
+          LOG.warn("Couldn't get file length information about log " + this.currentPath +
", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
+              + ", stats: " + getStats());
+          metrics.incrUnknownFileLengthForClosedWAL();
+        }
+        if (stat != null) {
+          if (trailerSize < 0) {
+            if (currentPosition < stat.getLen()) {
+              final long skippedBytes = stat.getLen() - currentPosition;
+              LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed
cleanly, so we did not parse " + skippedBytes + " bytes of data.");
+              metrics.incrUncleanlyClosedWALs();
+              metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+            }
+          } else if (currentPosition + trailerSize < stat.getLen()){
+            LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+ ", which is too far away from reported file length " + stat.getLen() +
+                ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
+            repLogReader.setPosition(0);
+            metrics.incrRestartedWALReading();
+            metrics.incrRepeatedFileBytes(currentPosition);
+            return false;
           }
+        }
+        if (LOG.isTraceEnabled()) {
           LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
-              + ", and the length of the file is " + filesize);
+              + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
         }
         this.currentPath = null;
         this.repLogReader.finishCurrentFile();
         this.reader = null;
+        metrics.incrCompletedWAL();
         return true;
       } else if (this.replicationQueueInfo.isQueueRecovered()) {
         LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
             + peerClusterZnode);
+        metrics.incrCompletedRecoveryQueue();
         workerRunning = false;
         return true;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
index b63f66b..3558d08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -119,6 +120,15 @@ public class ReplicationWALReaderManager {
     this.position = pos;
   }
 
+  public long currentTrailerSize() {
+    long size = -1L;
+    if (reader instanceof ProtobufLogReader) {
+      final ProtobufLogReader pblr = (ProtobufLogReader)reader;
+      size = pblr.trailerSize();
+    }
+    return size;
+  }
+
   /**
    * Close the current reader
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/39a79d50/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index fc91a18..2ec3200 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -1577,7 +1577,7 @@ The new layout will be:
 
 === Replication Metrics
 
-The following metrics are exposed at the global region server level and (since HBase 0.95)
at the peer level:
+The following metrics are exposed at the global region server level and at the peer level:
 
 `source.sizeOfLogQueue`::
   number of WALs to process (excludes the one which is being processed) at the Replication
source
@@ -1591,6 +1591,28 @@ The following metrics are exposed at the global region server level
and (since H
 `source.ageOfLastShippedOp`::
   age of last batch that was shipped by the replication source
 
+`source.completedLogs`::
+  The number of write-ahead-log files that have completed their acknowledged sending to the
peer associated with this source. Increments to this metric are a part of normal operation
of HBase replication.
+
+`source.completedRecoverQueues`::
+  The number of recovery queues this source has completed sending to the associated peer.
Increments to this metric are a part of normal recovery of HBase replication in the face of
failed Region Servers.
+
+`source.uncleanlyClosedLogs`::
+  The number of write-ahead-log files the replication system considered completed after reaching
the end of readable entries in the face of an uncleanly closed file.
+
+`source.ignoredUncleanlyClosedLogContentsInBytes`::
+  When a write-ahead-log file is not closed cleanly, there will likely be some entry that
has been partially serialized. This metric contains the number of bytes of such entries the
HBase replication system believes were remaining at the end of files skipped in the face of
an uncleanly closed file. Those bytes should either be in different file or represent a client
write that was not acknowledged.
+
+`source.restartedLogReading`::
+  The number of times the HBase replication system detected that it failed to correctly parse
a cleanly closed write-ahead-log file. In this circumstance, the system replays the entire
log from the beginning, ensuring that no edits fail to be acknowledged by the associated peer.
Increments to this metric indicate that the HBase replication system is having difficulty
correctly handling failures in the underlying distributed storage system. No dataloss should
occur, but you should check Region Server log files for details of the failures.
+
+`source.repeatedLogFileBytes`::
+  When the HBase replication system determines that it needs to replay a given write-ahead-log
file, this metric is incremented by the number of bytes the replication system believes had
already been acknowledged by the associated peer prior to starting over.
+
+`source.closedLogsWithUnknownFileLength`::
+  Incremented when the HBase replication system believes it is at the end of a write-ahead-log
file but it can not determine the length of that file in the underlying distributed storage
system. Could indicate dataloss since the replication system is unable to determine if the
end of readable entries lines up with the expected end of the file. You should check Region
Server log files for details of the failures.
+
+
 === Replication Configuration Options
 
 [cols="1,1,1", options="header"]


Mime
View raw message