hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-15252 Data loss when replaying wal if HDFS timeout
Date Fri, 12 Feb 2016 08:51:01 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 5716dda86 -> 7fe910778


HBASE-15252 Data loss when replaying wal if HDFS timeout

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7fe91077
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7fe91077
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7fe91077

Branch: refs/heads/0.98
Commit: 7fe91077826374a5b360f7362f1df5db3a48b985
Parents: 5716dda
Author: zhangduo <zhangduo@apache.org>
Authored: Fri Feb 12 08:17:10 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Fri Feb 12 16:46:52 2016 +0800

----------------------------------------------------------------------
 .../regionserver/wal/ProtobufLogReader.java     |   3 +-
 .../hbase/regionserver/wal/TestWALReplay.java   | 117 ++++++++++++++++++-
 2 files changed, 115 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe91077/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 8c2da41..a1377cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A Protobuf based WAL has the following structure:
@@ -293,7 +294,7 @@ public class ProtobufLogReader extends ReaderBase {
           }
           ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
             (int)size);
-        } catch (IOException ipbe) {
+        } catch (InvalidProtocolBufferException ipbe) {
           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+
             originalPosition + ", currentPosition=" + this.inputStream.getPos() +
             ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7fe91077/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index b3e7bba..f8d88b5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -21,8 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
+import java.io.FilterInputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
@@ -34,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,7 +53,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -71,12 +77,14 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DFSInputStream;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -84,6 +92,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * Test replay of edits out of a WAL split.
@@ -477,7 +487,7 @@ public class TestWALReplay {
     boolean first = true;
     for (HColumnDescriptor hcd: htd.getFamilies()) {
       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
-      if (first ) {
+      if (first) {
         // If first, so we have at least one family w/ different seqid to rest.
         region.flushcache();
         first = false;
@@ -805,8 +815,9 @@ public class TestWALReplay {
     final Configuration newConf = HBaseConfiguration.create(this.conf);
     User user = HBaseTestingUtility.getDifferentUser(newConf,
       ".replay.wal.secondtime");
-    user.runAs(new PrivilegedExceptionAction() {
-      public Object run() throws Exception {
+    user.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
         runWALSplit(newConf);
         FileSystem newFS = FileSystem.get(newConf);
         // 100k seems to make for about 4 flushes during HRegion#initialize.
@@ -896,6 +907,104 @@ public class TestWALReplay {
         lastestSeqNumber, editCount);
   }
 
+  /**
+   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
+   */
+  @Test
+  public void testDatalossWhenInputError() throws IOException, InstantiationException,
+      IllegalAccessException {
+    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+    HRegion region1 = HRegion.createHRegion(hri,
+      hbaseRootDir, this.conf, htd);
+    Path regionDir = region1.getRegionFileSystem().getRegionDir();
+    HRegion.closeHRegion(region1);
+
+    HLog wal = createWAL(this.conf);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
+    }
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+    // Now close the region (without flush), split the log, reopen the region and assert
that
+    // replay of log has the correct effect.
+    region.close(true);
+    wal.close();
+
+    runWALSplit(this.conf);
+
+    // here we let the DFSInputStream throw an IOException just after the WALHeader.
+    Path editFile = HLogUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
+    FSDataInputStream stream = fs.open(editFile);
+    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
+    Class<? extends HLog.Reader> logReaderClass =
+        conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+          HLog.Reader.class);
+    HLog.Reader reader = logReaderClass.newInstance();
+    reader.init(this.fs, editFile, conf, stream);
+    final long headerLength = stream.getPos();
+    reader.close();
+    FileSystem spyFs = spy(this.fs);
+    doAnswer(new Answer<FSDataInputStream>() {
+
+      @Override
+      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
+        Field field = FilterInputStream.class.getDeclaredField("in");
+        field.setAccessible(true);
+        final DFSInputStream in = (DFSInputStream) field.get(stream);
+        DFSInputStream spyIn = spy(in);
+        doAnswer(new Answer<Integer>() {
+
+          private long pos;
+
+          @Override
+          public Integer answer(InvocationOnMock invocation) throws Throwable {
+            if (pos >= headerLength) {
+              throw new IOException("read over limit");
+            }
+            int b = (Integer) invocation.callRealMethod();
+            if (b > 0) {
+              pos += b;
+            }
+            return b;
+          }
+        }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
+        doAnswer(new Answer<Void>() {
+
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            invocation.callRealMethod();
+            in.close();
+            return null;
+          }
+        }).when(spyIn).close();
+        field.set(stream, spyIn);
+        return stream;
+      }
+    }).when(spyFs).open(eq(editFile));
+
+    HLog wal2 = createWAL(this.conf);
+    HRegion region2;
+    try {
+      // log replay should fail due to the IOException, otherwise we may lose data.
+      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
+      assertEquals(result.size(), region2.get(g).size());
+    } catch (IOException e) {
+      assertEquals("read over limit", e.getMessage());
+    }
+    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
+    assertEquals(result.size(), region2.get(g).size());
+  }
+
   static class MockHLog extends FSHLog {
     boolean doCompleteCacheFlush = false;
 


Mime
View raw message