hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject hbase git commit: HBASE-11571 Bulk load handling from secondary region replicas
Date Tue, 03 Mar 2015 23:23:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 6b1674b31 -> 4506b6e00


HBASE-11571 Bulk load handling from secondary region replicas


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4506b6e0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4506b6e0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4506b6e0

Branch: refs/heads/branch-1
Commit: 4506b6e00315c50bd3537a5fe7f8970c95246aa4
Parents: 6b1674b
Author: Jeffrey Zhong <jeffreyz@apache.org>
Authored: Fri Feb 27 17:21:06 2015 -0800
Committer: Jeffrey Zhong <jeffreyz@apache.org>
Committed: Tue Mar 3 15:21:22 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 87 ++++++++++++++++-
 .../hadoop/hbase/regionserver/HStore.java       | 25 +++--
 .../hbase/regionserver/RSRpcServices.java       |  6 ++
 .../apache/hadoop/hbase/regionserver/Store.java |  3 +-
 .../hadoop/hbase/regionserver/TestBulkLoad.java |  3 -
 .../regionserver/TestHRegionReplayEvents.java   | 99 ++++++++++++++++++++
 6 files changed, 210 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4506b6e0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9389bb5..d90d72c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -4664,6 +4665,86 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
     }
   }
 
+  void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException
{
+    checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
+      "BulkLoad marker from WAL ", bulkLoadEvent);
+
+    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+      return; // if primary nothing to do
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getRegionInfo().getEncodedName() + " : "
+              +  "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
+    }
+    // check if multiple families involved
+    boolean multipleFamilies = false;
+    byte[] family = null;
+    for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
+      byte[] fam = storeDescriptor.getFamilyName().toByteArray();
+      if (family == null) {
+        family = fam;
+      } else if (!Bytes.equals(family, fam)) {
+        multipleFamilies = true;
+        break;
+      }
+    }
+
+    startBulkRegionOperation(multipleFamilies);
+    try {
+      // we will use writestate as a coarse-grain lock for all the replay events
+      synchronized (writestate) {
+        // Replication can deliver events out of order when primary region moves or the region
+        // server crashes, since there is no coordination between replication of different
wal files
+        // belonging to different region servers. We have to safe guard against this case
by using
+        // region open event's seqid. Since this is the first event that the region puts
(after
+        // possibly flushing recovered.edits), after seeing this event, we can ignore every
edit
+        // smaller than this seqId
+        if (bulkLoadEvent.getBulkloadSeqNum() >= 0
+            && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum())
{
+          LOG.warn(getRegionInfo().getEncodedName() + " : "
+              + "Skipping replaying bulkload event :"
+              + TextFormat.shortDebugString(bulkLoadEvent)
+              + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
+              + " =" + lastReplayedOpenRegionSeqId);
+
+          return;
+        }
+
+        for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
+          // stores of primary may be different now
+          family = storeDescriptor.getFamilyName().toByteArray();
+          Store store = getStore(family);
+          if (store == null) {
+            LOG.warn(getRegionInfo().getEncodedName() + " : "
+                    + "Received a bulk load marker from primary, but the family is not found.
"
+                    + "Ignoring. StoreDescriptor:" + storeDescriptor);
+            continue;
+          }
+
+          List<String> storeFiles = storeDescriptor.getStoreFileList();
+          for (String storeFile : storeFiles) {
+            StoreFileInfo storeFileInfo = null;
+            try {
+              storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
+              store.bulkLoadHFile(storeFileInfo);
+            } catch(FileNotFoundException ex) {
+              LOG.warn(getRegionInfo().getEncodedName() + " : "
+                      + ((storeFileInfo != null) ? storeFileInfo.toString() :
+                            (new Path(Bytes.toString(family), storeFile)).toString())
+                      + " doesn't exist any more. Skip loading the file");
+            }
+          }
+        }
+      }
+      if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
+        getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
+      }
+    } finally {
+      closeBulkRegionOperation();
+    }
+  }
+
   /** Checks whether the given regionName is either equal to our region, or that
    * the regionName is the primary region to our corresponding range for the secondary replica.
    */
@@ -5014,13 +5095,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
           if (bulkLoadListener != null) {
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
           }
-          store.bulkLoadHFile(finalPath, seqId);
+          Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
 
           if(storeFiles.containsKey(familyName)) {
-            storeFiles.get(familyName).add(new Path(finalPath));
+            storeFiles.get(familyName).add(commitedStoreFile);
           } else {
             List<Path> storeFileNames = new ArrayList<Path>();
-            storeFileNames.add(new Path(finalPath));
+            storeFileNames.add(commitedStoreFile);
             storeFiles.put(familyName, storeFileNames);
           }
           if (bulkLoadListener != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4506b6e0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 29952fd..67c261c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -781,19 +781,33 @@ public class HStore implements Store {
   }
 
   @Override
-  public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+  public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
     Path srcPath = new Path(srcPathStr);
     Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
 
+    LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as
"
+        + dstPath + " - updating store file list.");
+
     StoreFile sf = createStoreFileAndReader(dstPath);
+    bulkLoadHFile(sf);
+
+    LOG.info("Successfully loaded store file " + srcPath + " into store " + this
+        + " (new location: " + dstPath + ")");
+
+    return dstPath;
+  }
 
+  @Override
+  public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
+    StoreFile sf = createStoreFileAndReader(fileInfo);
+    bulkLoadHFile(sf);
+  }
+
+  private void bulkLoadHFile(StoreFile sf) throws IOException {
     StoreFile.Reader r = sf.getReader();
     this.storeSize += r.length();
     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
 
-    LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
-        "' as " + dstPath + " - updating store file list.");
-
     // Append the new storefile into the list
     this.lock.writeLock().lock();
     try {
@@ -807,8 +821,7 @@ public class HStore implements Store {
       this.lock.writeLock().unlock();
     }
     notifyChangedReadersObservers();
-    LOG.info("Successfully loaded store file " + srcPath
-        + " into store " + this + " (new location: " + dstPath + ")");
+    LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
     if (LOG.isTraceEnabled()) {
       String traceMessage = "BULK LOAD time,size,store size,store files ["
           + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize

http://git-wip-us.apache.org/repos/asf/hbase/blob/4506b6e0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index ccad06e..0e426d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
@@ -730,6 +731,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               region.replayWALRegionEventMarker(regionEvent);
               continue;
             }
+            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
+            if (bulkLoadEvent != null) {
+              region.replayWALBulkLoadEventMarker(bulkLoadEvent);
+              continue;
+            }
           }
           it.remove();
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4506b6e0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 6a422a9..b638a8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -247,7 +247,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
    * @param srcPathStr
    * @param sequenceId sequence Id associated with the HFile
    */
-  void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
+  Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
 
   // General accessors into the state of the store
   // TODO abstract some of this out into a metrics class
@@ -440,4 +440,5 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
     */
   void refreshStoreFiles(Collection<String> newFiles) throws IOException;
 
+  void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4506b6e0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 15dbef5..931306c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -296,9 +296,6 @@ public class TestBulkLoad {
         assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
         assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
         assertEquals(storeFileNames.size(), store.getStoreFileCount());
-        for (String storeFile : store.getStoreFileList()) {
-          assertTrue(storeFile.equals(storeFileNames.get(index++)));
-        }
       }
       
       return true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4506b6e0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index d3ac837..14d898f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -28,33 +28,46 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
 
+import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
@@ -63,6 +76,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -74,6 +88,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
 import com.google.common.collect.Lists;
@@ -1327,6 +1342,90 @@ public class TestHRegionReplayEvents {
     region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
   }
 
+  /**
+   * Tests replaying region open markers from primary region. Checks whether the files are
picked up
+   */
+  @Test
+  public void testReplayBulkLoadEvent() throws IOException {
+    LOG.info("testReplayBulkLoadEvent starts");
+    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
+
+    // close the region and open again.
+    primaryRegion.close();
+    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss,
null);
+    
+    // bulk load a file into primary region
+    Random random = new Random();
+    byte[] randomValues = new byte[20];
+    random.nextBytes(randomValues);
+    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
+
+    List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[],
String>>();
+    int expectedLoadFileCount = 0;
+    for (byte[] family : families) {
+      familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(testPath,
family,
+        randomValues)));
+      expectedLoadFileCount++;
+    }
+    primaryRegion.bulkLoadHFiles(familyPaths, false);
+
+    // now replay the edits and the bulk load marker
+    reader = createWALReaderForPrimary();
+
+    LOG.info("-- Replaying edits and region events in secondary");
+    BulkLoadDescriptor bulkloadEvent = null;
+    while (true) {
+      WAL.Entry entry = reader.next();
+      if (entry == null) {
+        break;
+      }
+      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
+      if (bulkloadEvent != null) {
+        break;
+      }
+    }
+
+    // we should have 1 bulk load event
+    assertTrue(bulkloadEvent != null);
+    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
+
+    // replay the bulk load event
+    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
+
+
+    List<String> storeFileName = new ArrayList<String>();
+    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
+      storeFileName.addAll(storeDesc.getStoreFileList());
+    }
+    // assert that the bulk loaded files are picked
+    for (Store s : secondaryRegion.getStores().values()) {
+      for (StoreFile sf : s.getStorefiles()) {
+        storeFileName.remove(sf.getPath().getName());
+      }
+    }
+    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
+
+    LOG.info("-- Verifying edits from secondary");
+    for (byte[] family : families) {
+      assertGet(secondaryRegion, family, randomValues);
+    }
+  }
+
+  private String createHFileForFamilies(Path testPath, byte[] family,
+      byte[] valueBytes) throws IOException {
+    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
+    // TODO We need a way to do this without creating files
+    Path testFile = new Path(testPath, UUID.randomUUID().toString());
+    hFileFactory.withOutputStream(TEST_UTIL.getTestFileSystem().create(testFile));
+    hFileFactory.withFileContext(new HFileContext());
+    HFile.Writer writer = hFileFactory.create();
+
+    writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
+      KeyValue.Type.Put.getCode(), valueBytes)));
+    writer.close();
+    return testFile.toString();
+  }
+
   /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys.
Does
    * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number
of
    * more rows but does not execute flush after


Mime
View raw message