hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1492338 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/
Date Wed, 12 Jun 2013 18:35:53 GMT
Author: liyin
Date: Wed Jun 12 18:35:53 2013
New Revision: 1492338

URL: http://svn.apache.org/r1492338
Log:
[HBASE-8714] Add sync_file_range(WAIT) option to flush and compactions.

Author: pritam

Summary:
The idea here is that we don't want runaway compactions and
flushes to write faster than disk to OS/raid cache and then hit the
disks hard on a writeback. For this purpose we invoke a
sync_file_range(WAIT) every megabyte which forces the regionserver to wait
until the amount of data that it has written hasn't been persisted to
disk.

Test Plan:
1) All unit tests.
2) Simple unit test added to trigger option.

Reviewers: liyintang, rshroff, paultuckfield, aaiyer, sdong

Reviewed By: aaiyer

CC: hbase-eng@, hdfs-dev@

Differential Revision: https://phabricator.fb.com/D839449

Task ID: 2233461

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSyncFileRangeThrottling.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1492338&r1=1492337&r2=1492338&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
Wed Jun 12 18:35:53 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WriteOptions;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 
@@ -304,6 +305,15 @@ public abstract class AbstractHFileWrite
   protected static FSDataOutputStream createOutputStream(Configuration conf,
       FileSystem fs, Path path, int bytesPerChecksum,
       InetSocketAddress[] favoredNodes) throws IOException {
+    return createOutputStream(conf, fs, path, bytesPerChecksum, favoredNodes,
+        new WriteOptions());
+  }
+
+  /** A helper method to create HFile output streams in constructors */
+  protected static FSDataOutputStream createOutputStream(Configuration conf,
+      FileSystem fs, Path path, int bytesPerChecksum,
+      InetSocketAddress[] favoredNodes, WriteOptions options)
+      throws IOException {
     if (fs instanceof DistributedFileSystem) {
       // Try to use the favoredNodes version via reflection to allow backwards-
       // compatibility.
@@ -311,11 +321,12 @@ public abstract class AbstractHFileWrite
         return (FSDataOutputStream) DistributedFileSystem.class
             .getDeclaredMethod("create", Path.class, FsPermission.class,
                 boolean.class, int.class, short.class, long.class, int.class,
-                Progressable.class, InetSocketAddress[].class)
+                Progressable.class, InetSocketAddress[].class,
+                WriteOptions.class)
             .invoke(fs, path, FsPermission.getDefault(), true,
                 fs.getConf().getInt("io.file.buffer.size", 4096),
                 fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-                bytesPerChecksum, null, favoredNodes);
+                bytesPerChecksum, null, favoredNodes, options);
       } catch (InvocationTargetException ite) {
         // Function was properly called, but threw it's own exception.
         throw new IOException(ite.getCause());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1492338&r1=1492337&r2=1492338&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Jun
12 18:35:53 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.WriteOptions;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
@@ -308,6 +309,7 @@ public class HFile {
     protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
     protected KeyComparator comparator;
     protected InetSocketAddress[] favoredNodes;
+    protected WriteOptions options = new WriteOptions();
 
     WriterFactory(Configuration conf, CacheConfig cacheConf) {
       this.conf = conf;
@@ -357,6 +359,12 @@ public class HFile {
       return this;
     }
 
+    public WriterFactory withWriteOptions(WriteOptions options) {
+      Preconditions.checkNotNull(options);
+      this.options = options;
+      return this;
+    }
+
     public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) {
       // Deliberately not checking for null here.
       this.favoredNodes = favoredNodes;
@@ -370,7 +378,8 @@ public class HFile {
       }
       if (path != null) {
         ostream = AbstractHFileWriter.createOutputStream(conf, fs, path,
-            HFile.getBytesPerChecksum(conf, fs.getConf()), favoredNodes);
+            HFile.getBytesPerChecksum(conf, fs.getConf()), favoredNodes,
+            options);
       }
       return createWriter();
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1492338&r1=1492337&r2=1492338&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed
Jun 12 18:35:53 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WriteOptions;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -758,7 +760,7 @@ public class Store extends SchemaConfigu
       synchronized (flushLock) {
         status.setStatus("Flushing " + this + ": creating writer");
         // A. Write the map out to the disk
-        writer = createWriterInTmp(snapshot.size());
+        writer = createWriterInTmp(snapshot.size(), this.compression, false);
         writer.setTimeRangeTracker(snapshotTimeRangeTracker);
         fileName = writer.getPath().getName();
         try {
@@ -830,9 +832,19 @@ public class Store extends SchemaConfigu
   /*
    * @return Writer for a new StoreFile in the tmp dir.
    */
-  private StoreFile.Writer createWriterInTmp(long maxKeyCount)
-  throws IOException {
-    return createWriterInTmp(maxKeyCount, this.compression, false);
+  private StoreFile.Writer createWriterInTmp(long maxKeyCount,
+      Compression.Algorithm compression, boolean isCompaction)
+      throws IOException {
+    // This method is only invoked during write for flush/compaction and hence
+    // on the DFS level we have a sync_file_range(WAIT_AFTER) to throttle these
+    // background writers.
+    WriteOptions options = new WriteOptions();
+    if (conf.getBoolean("hbase.enable.syncfilerange.throttling", false)) {
+      options.setSyncFileRange(NativeIO.SYNC_FILE_RANGE_WAIT_AFTER
+          | NativeIO.SYNC_FILE_RANGE_WRITE);
+    }
+    return createWriterInTmp(maxKeyCount, compression, isCompaction,
+        options);
   }
 
   /*
@@ -842,7 +854,8 @@ public class Store extends SchemaConfigu
    * @return Writer for a new StoreFile in the tmp dir.
    */
   private StoreFile.Writer createWriterInTmp(long maxKeyCount,
-    Compression.Algorithm compression, boolean isCompaction)
+      Compression.Algorithm compression, boolean isCompaction,
+      WriteOptions options)
   throws IOException {
     final CacheConfig writerCacheConf;
     if (isCompaction) {
@@ -861,6 +874,7 @@ public class Store extends SchemaConfigu
             .withMaxKeyCount(maxKeyCount)
             .withFavoredNodes(region.getFavoredNodes())
             .withCompression(compression)
+            .withWriteOptions(options)
             .build();
     w.getHFileWriter().setCompactionWriter(isCompaction);
     // The store file writer's path does not include the CF name, so we need

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1492338&r1=1492337&r2=1492338&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Wed Jun 12 18:35:53 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WriteOptions;
 import org.apache.hadoop.io.WritableUtils;
 
 import com.google.common.base.Function;
@@ -649,6 +650,7 @@ public class StoreFile extends SchemaCon
     private Path filePath;
     private InetSocketAddress[] favoredNodes;
     private  float bloomErrorRate;
+    private WriteOptions options = new WriteOptions();
 
     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
         FileSystem fs, int blockSize) {
@@ -657,6 +659,7 @@ public class StoreFile extends SchemaCon
       this.fs = fs;
       this.blockSize = blockSize;
       bloomErrorRate = BloomFilterFactory.getErrorRate(conf);
+      options = new WriteOptions();
     }
 
     /**
@@ -672,6 +675,12 @@ public class StoreFile extends SchemaCon
       return this;
     }
 
+    public WriterBuilder withWriteOptions(WriteOptions options) {
+      Preconditions.checkNotNull(dir);
+      this.options = options;
+      return this;
+    }
+
     /**
      * Use either this method or {@link #withOutputDir}, but not both.
      * @param filePath the StoreFile path to write
@@ -891,6 +900,7 @@ public class StoreFile extends SchemaCon
           .withDataBlockEncoder(wb.dataBlockEncoder)
           .withComparator(wb.comparator.getRawComparator())
           .withFavoredNodes(wb.favoredNodes)
+          .withWriteOptions(wb.options)
           .create();
 
       this.kvComparator = wb.comparator;

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSyncFileRangeThrottling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSyncFileRangeThrottling.java?rev=1492338&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSyncFileRangeThrottling.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSyncFileRangeThrottling.java
Wed Jun 12 18:35:53 2013
@@ -0,0 +1,77 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.InjectionEventCore;
+import org.apache.hadoop.util.InjectionEventI;
+import org.apache.hadoop.util.InjectionHandler;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSyncFileRangeThrottling {
+  private static final Log LOG = LogFactory
+      .getLog(TestSyncFileRangeThrottling.class);
+  protected static HBaseTestingUtility TEST_UTIL;
+  private static volatile boolean syncFileRangeInvoked = false;
+  private static volatile int flags = 0;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.enable.syncfilerange.throttling", true);
+    TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.startMiniCluster(1);
+    InjectionHandler.set(new TestSyncFileRangeThrottlingHandler());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public static class TestSyncFileRangeThrottlingHandler extends
+      InjectionHandler {
+    protected void _processEvent(InjectionEventI event, Object... args) {
+      if (event == InjectionEventCore.NATIVEIO_SYNC_FILE_RANGE) {
+        flags = (Integer) args[0];
+        syncFileRangeInvoked = true;
+      }
+    }
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    byte[] family = "family".getBytes();
+    HTable table = TEST_UTIL
+        .createTable("test".getBytes(), family);
+    TEST_UTIL.loadTable2(table, family);
+    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    for (HRegion region : server.getOnlineRegions()) {
+      if (!new String(region.getTableDesc().getName()).equals(new String(
+          table.getTableName()))) {
+        LOG.info("Skipping since name is : "
+            + new String(region.getTableDesc().getName()));
+        continue;
+      }
+      Store store = region.getStore(family);
+      store.compactRecentForTesting(-1);
+    }
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < 30000 && !syncFileRangeInvoked)
{
+      Thread.sleep(1000);
+    }
+    assertTrue(syncFileRangeInvoked);
+    int expectedFlags = NativeIO.SYNC_FILE_RANGE_WAIT_AFTER
+        | NativeIO.SYNC_FILE_RANGE_WRITE;
+    assertEquals(expectedFlags, flags);
+  }
+
+}



Mime
View raw message