hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-15441 Fix WAL splitting when region has moved multiple times
Date Wed, 16 Mar 2016 23:37:17 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 3adcc750e -> ecec35ae4


HBASE-15441 Fix WAL splitting when region has moved multiple times

Summary:
Currently WAL splitting is broken when a region has been opened multiple times in recent minutes.

Region open and region close write event markers to the wal. These markers should have the
sequence id in them. However it is currently getting 1. That means that if a region has moved
multiple times in the last few mins then multiple split log workers will try and create the
recovered edits file for sequence id 1. One of the workers will fail and on failing they will
delete the recovered edits. Causing all split wal attempts to fail.

We need to:

It appears that the close event with a sequence id of one is coming from region warm up.

This patch fixes that by making sure the close on warm up doesn't happen. Also splitting will
ignore any of the events that are already in the logs.

Test Plan: Unit tests pass

Differential Revision: https://reviews.facebook.net/D55557


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ecec35ae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ecec35ae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ecec35ae

Branch: refs/heads/master
Commit: ecec35ae4e6867da3b42d674f6eccbe8a9f7d533
Parents: 3adcc75
Author: Elliott Clark <eclark@apache.org>
Authored: Thu Mar 10 14:48:57 2016 -0800
Committer: Elliott Clark <eclark@apache.org>
Committed: Wed Mar 16 16:26:09 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     | 29 +++++++--
 .../hadoop/hbase/regionserver/HRegion.java      | 10 +--
 .../hadoop/hbase/regionserver/wal/WALEdit.java  | 14 +++--
 .../RegionReplicaReplicationEndpoint.java       |  5 ++
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 24 +++++++
 .../apache/hadoop/hbase/wal/TestWALSplit.java   | 66 ++++++++++++++++++--
 6 files changed, 129 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ecec35ae/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 9c71d97..2636777 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2629,15 +2629,36 @@ public final class ProtobufUtil {
   public static RegionEventDescriptor toRegionEventDescriptor(
       EventType eventType, HRegionInfo hri, long seqId, ServerName server,
       Map<byte[], List<Path>> storeFiles) {
+    final byte[] tableNameAsBytes = hri.getTable().getName();
+    final byte[] encodedNameAsBytes = hri.getEncodedNameAsBytes();
+    final byte[] regionNameAsBytes = hri.getRegionName();
+    return toRegionEventDescriptor(eventType,
+        tableNameAsBytes,
+        encodedNameAsBytes,
+        regionNameAsBytes,
+        seqId,
+
+        server,
+        storeFiles);
+  }
+
+  public static RegionEventDescriptor toRegionEventDescriptor(EventType eventType,
+                                                              byte[] tableNameAsBytes,
+                                                              byte[] encodedNameAsBytes,
+                                                              byte[] regionNameAsBytes,
+                                                               long seqId,
+
+                                                              ServerName server,
+                                                              Map<byte[], List<Path>>
storeFiles) {
     RegionEventDescriptor.Builder desc = RegionEventDescriptor.newBuilder()
         .setEventType(eventType)
-        .setTableName(ByteStringer.wrap(hri.getTable().getName()))
-        .setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
-        .setRegionName(ByteStringer.wrap(hri.getRegionName()))
+        .setTableName(ByteStringer.wrap(tableNameAsBytes))
+        .setEncodedRegionName(ByteStringer.wrap(encodedNameAsBytes))
+        .setRegionName(ByteStringer.wrap(regionNameAsBytes))
         .setLogSequenceNumber(seqId)
         .setServer(toServerName(server));
 
-    for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
+    for (Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
       StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
           .setFamilyName(ByteStringer.wrap(entry.getKey()))
           .setStoreHomeDir(Bytes.toString(entry.getKey()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecec35ae/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c44145e..a17df8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -966,10 +966,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
   private void initializeWarmup(final CancelableProgressable reporter) throws IOException
{
     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
-
     // Initialize all the HStores
     status.setStatus("Warming up all the Stores");
-    initializeStores(reporter, status);
+    try {
+      initializeStores(reporter, status);
+    } finally {
+      status.markComplete("Done warming up.");
+    }
   }
 
   /**
@@ -6427,9 +6430,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       fs = FileSystem.get(conf);
     }
 
-    HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
+    HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
     r.initializeWarmup(reporter);
-    r.close();
   }
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecec35ae/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 346a8ed..d5a4a75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -89,10 +89,14 @@ public class WALEdit implements Writable, HeapSize {
 
   // TODO: Get rid of this; see HBASE-8457
   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
-  static final byte [] METAROW = Bytes.toBytes("METAROW");
-  static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
-  static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
-  static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
+  @VisibleForTesting
+  public static final byte [] METAROW = Bytes.toBytes("METAROW");
+  @VisibleForTesting
+  public static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
+  @VisibleForTesting
+  public static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
+  @VisibleForTesting
+  public static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
   @VisibleForTesting
   public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
 
@@ -343,7 +347,7 @@ public class WALEdit implements Writable, HeapSize {
     return new WALEdit().add(kv); //replication scope null so that this won't be replicated
   }
 
-  private static byte[] getRowForRegion(HRegionInfo hri) {
+  public static byte[] getRowForRegion(HRegionInfo hri) {
     byte[] startKey = hri.getStartKey();
     if (startKey.length == 0) {
       // empty row key is not allowed in mutations because it is both the start key and the
end key

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecec35ae/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index de87ac3..da37cfa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -367,6 +367,11 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint
{
     }
 
     @Override
+    public boolean keepRegionEvents() {
+      return true;
+    }
+
+    @Override
     public List<Path> finishWritingAndClose() throws IOException {
       finishWriting(true);
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecec35ae/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index ad5774f..8d78480 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -359,6 +359,11 @@ public class WALSplitter {
           editsSkipped++;
           continue;
         }
+        // Don't send Compaction/Close/Open region events to recovered edit type sinks.
+        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvents()) {
+          editsSkipped++;
+          continue;
+        }
         entryBuffers.appendEntry(entry);
         editsCount++;
         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
@@ -1266,6 +1271,15 @@ public class WALSplitter {
     public boolean flush() throws IOException {
       return false;
     }
+
+    /**
+     * Some WALEdit's contain only KV's for account on what happened to a region.
+     * Not all sinks will want to get those edits.
+     *
+     * @return Return true if this sink wants to get all WALEdit's regardless of if it's
a region
+     * event.
+     */
+    public abstract boolean keepRegionEvents();
   }
 
   /**
@@ -1609,6 +1623,11 @@ public class WALSplitter {
       }
     }
 
+    @Override
+    public boolean keepRegionEvents() {
+      return false;
+    }
+
     /**
      * @return a map from encoded region ID to the number of edits written out for that region.
      */
@@ -2060,6 +2079,11 @@ public class WALSplitter {
       return false;
     }
 
+    @Override
+    public boolean keepRegionEvents() {
+      return true;
+    }
+
     void addWriterError(Throwable t) {
       thrown.add(t);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecec35ae/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 67fc60a..c25105a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -55,18 +56,24 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -422,7 +429,7 @@ public class TestWALSplit {
     REGIONS.clear();
     REGIONS.add(REGION);
 
-    generateWALs(1, 10, -1);
+    generateWALs(1, 10, -1, 0);
     useDifferentDFSClient();
     WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
@@ -432,6 +439,22 @@ public class TestWALSplit {
     assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
   }
 
+  @Test (timeout=300000)
+  public void testSplitRemovesRegionEventsEdits() throws IOException{
+    final String REGION = "region__1";
+    REGIONS.clear();
+    REGIONS.add(REGION);
+
+    generateWALs(1, 10, -1, 100);
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
+
+    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
+  }
+
   /**
    * @param expectedEntries -1 to not assert
    * @return the count across all regions
@@ -610,7 +633,7 @@ public class TestWALSplit {
     REGIONS.add(REGION);
 
     Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
-    generateWALs(1, entryCount, -1);
+    generateWALs(1, entryCount, -1, 0);
     corruptWAL(c1, corruption, true);
 
     useDifferentDFSClient();
@@ -1120,7 +1143,11 @@ public class TestWALSplit {
   }
 
   private Writer generateWALs(int leaveOpen) throws IOException {
-    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
+    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
+  }
+
+  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException
{
+    return generateWALs(writers, entries, leaveOpen, 7);
   }
 
   private void makeRegionDirs(List<String> regions) throws IOException {
@@ -1134,11 +1161,12 @@ public class TestWALSplit {
    * @param leaveOpen index to leave un-closed. -1 to close all.
    * @return the writer that's still open, or null if all were closed.
    */
-  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException
{
+  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents)
throws IOException {
     makeRegionDirs(REGIONS);
     fs.mkdirs(WALDIR);
     Writer [] ws = new Writer[writers];
     int seq = 0;
+    int numRegionEventsAdded = 0;
     for (int i = 0; i < writers; i++) {
       ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
       for (int j = 0; j < entries; j++) {
@@ -1147,6 +1175,11 @@ public class TestWALSplit {
           String row_key = region + prefix++ + i + j;
           appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
               VALUE, seq++);
+
+          if (numRegionEventsAdded < regionEvents) {
+            numRegionEventsAdded ++;
+            appendRegionEvent(ws[i], region);
+          }
         }
       }
       if (i != leaveOpen) {
@@ -1160,6 +1193,8 @@ public class TestWALSplit {
     return ws[leaveOpen];
   }
 
+
+
   private Path[] getLogForRegion(Path rootdir, TableName table, String region)
       throws IOException {
     Path tdir = FSUtils.getTableDir(rootdir, table);
@@ -1270,6 +1305,23 @@ public class TestWALSplit {
     return count;
   }
 
+  private static void appendRegionEvent(Writer w, String region) throws IOException {
+    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
+        WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
+        TABLE_NAME.toBytes(),
+        region.getBytes(),
+        String.valueOf(region.hashCode()).getBytes(),
+        1,
+        ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
+    final long time = EnvironmentEdgeManager.currentTime();
+    KeyValue kv = new KeyValue(region.getBytes(), WALEdit.METAFAMILY, WALEdit.REGION_EVENT,
+        time, regionOpenDesc.toByteArray());
+    final WALKey walKey = new WALKey(region.getBytes(), TABLE_NAME, 1, time,
+        HConstants.DEFAULT_CLUSTER_ID);
+    w.append(
+        new Entry(walKey, new WALEdit().add(kv)));
+  }
+
   public static long appendEntry(Writer writer, TableName table, byte[] region,
       byte[] row, byte[] family, byte[] qualifier,
       byte[] value, long seq)
@@ -1286,9 +1338,11 @@ public class TestWALSplit {
       byte[] row, byte[] family, byte[] qualifier,
       byte[] value, long seq) {
     long time = System.nanoTime();
-    WALEdit edit = new WALEdit();
+
     seq++;
-    edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
+    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
+    WALEdit edit = new WALEdit();
+    edit.add(cell);
     return new Entry(new WALKey(region, table, seq, time,
         HConstants.DEFAULT_CLUSTER_ID), edit);
   }


Mime
View raw message