hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1356925 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Tue, 03 Jul 2012 20:42:28 GMT
Author: mbautin
Date: Tue Jul  3 20:42:27 2012
New Revision: 1356925

URL: http://svn.apache.org/viewvc?rev=1356925&view=rev
Log:
[master] Parallelize closing writers in HLogSplitter

Author: avf

Summary:
SequenceFileWriter does its own buffering. These buffers are flushed
during a call to close. Currently all writers are closed sequentially.
This patch creates a thread pool of configurable size (configured by
hbase.hlog.split.close.threads, default 10) where close() method is
executed on each WriterAndPath object.

Not porting this to HBase trunk, as HBASE-6134 takes a different
approach.

Test Plan:
Verified correctness by running unit tests for HLogSplitter (will run
all unit tests through MRUnit before commiting to svn).

Will test performance by splitting an existing logfile with and
without this change.

Revert Plan:

Reviewers: pkhemani, kranganathan, liyintang

Reviewed By: liyintang

CC: kannan, hbase-eng@

Differential Revision: https://phabricator.fb.com/D509010

Task ID: 1114386

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1356925&r1=1356924&r2=1356925&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue Jul  3 20:42:27 2012
@@ -47,6 +47,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -119,6 +121,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
@@ -272,6 +275,8 @@ public class HRegionServer implements HR
 
   private ZooKeeperWrapper zooKeeperWrapper;
 
+  private final ExecutorService logCloseThreadPool;
+  
   // Log Splitting Worker
   private SplitLogWorker splitLogWorker;
 
@@ -386,6 +391,12 @@ public class HRegionServer implements HR
     }
     LOG.info("minCheckFSIntervalMillis=" + minCheckFSIntervalMillis);
     LOG.info("checkFSAbortTimeOutMillis=" + checkFSAbortTimeOutMillis);
+    
+    int logCloseThreads =
+        conf.getInt("hbase.hlog.split.close.threads", 20);
+    logCloseThreadPool =
+        Executors.newFixedThreadPool(logCloseThreads,
+            new DaemonThreadFactory("hregionserver-split-logClose-thread-"));
   }
 
   /**
@@ -1417,7 +1428,8 @@ public class HRegionServer implements HR
     this.server.start();
     // Create the log splitting worker and start it
     this.splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
-        this.getConfiguration(), this.serverInfo.getServerName());
+        this.getConfiguration(), this.serverInfo.getServerName(),
+        logCloseThreadPool);
     splitLogWorker.start();
     LOG.info("HRegionServer started at: " +
       this.serverInfo.getServerAddress().toString());
@@ -1497,6 +1509,7 @@ public class HRegionServer implements HR
       // Wakes run() if it is sleeping
       notifyAll(); // FindBugs NN_NAKED_NOTIFY
     }
+    logCloseThreadPool.shutdown();
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1356925&r1=1356924&r2=1356925&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
Tue Jul  3 20:42:27 2012
@@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -96,7 +97,7 @@ public class SplitLogWorker implements R
   }
 
   public SplitLogWorker(ZooKeeperWrapper watcher, final Configuration conf,
-      final String serverName) {
+      final String serverName, final ExecutorService logCloseThreadPool) {
     this(watcher, conf, serverName, new TaskExecutor () {
       @Override
       public Status exec(String filename, CancelableProgressable p) {
@@ -127,7 +128,7 @@ public class SplitLogWorker implements R
           String tmpname =
             ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
           if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
-              st, fs, conf, p) == false) {
+              st, fs, conf, p, logCloseThreadPool) == false) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1356925&r1=1356924&r2=1356925&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Tue Jul  3 20:42:27 2012
@@ -23,17 +23,20 @@ import static org.apache.hadoop.hbase.ut
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
@@ -44,7 +47,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@@ -52,14 +54,11 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.io.MultipleIOException;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
@@ -81,6 +80,9 @@ public class HLogSplitter {
   protected final FileSystem fs;
   protected final Configuration conf;
 
+  // Thread pool for closing LogWriters in parallel
+  protected final ExecutorService logCloseThreadPool;
+
   // If an exception is thrown by one of the other threads, it will be
   // stored here.
   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
@@ -134,12 +136,13 @@ public class HLogSplitter {
   }
 
   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
-      Path oldLogDir, FileSystem fs) {
+      Path oldLogDir, FileSystem fs, ExecutorService logCloseThreadPool) {
     this.conf = conf;
     this.rootDir = rootDir;
     this.srcDir = srcDir;
     this.oldLogDir = oldLogDir;
     this.fs = fs;
+    this.logCloseThreadPool = logCloseThreadPool;
   }
 
   /**
@@ -162,9 +165,10 @@ public class HLogSplitter {
    */
   static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
       FileStatus logfile, FileSystem fs,
-      Configuration conf, CancelableProgressable reporter) throws IOException {
+      Configuration conf, CancelableProgressable reporter,
+      ExecutorService logCloseThreadPool) throws IOException {
     HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
-        fs);
+        fs, logCloseThreadPool);
     return s.splitLogFileToTemp(logfile, tmpname, reporter);
   }
 
@@ -258,6 +262,7 @@ public class HLogSplitter {
       throw e;
     } finally {
       int n = 0;
+      List<Future<Void>> closeResults = new ArrayList<Future<Void>>();
       for (Object o : logWriters.values()) {
         long t1 = EnvironmentEdgeManager.currentTimeMillis();
         if ((t1 - last_report_at) > period) {
@@ -271,13 +276,42 @@ public class HLogSplitter {
           continue;
         }
         n++;
-        WriterAndPath wap = (WriterAndPath)o;
+
+        final WriterAndPath wap = (WriterAndPath)o;
         try {
-          wap.w.close();
-        } catch (IOException ioe) {
-          LOG.warn("Failed to close recovered edits writer " + wap.p, ioe);
+          Future<Void> closeResult =
+              logCloseThreadPool.submit(new Callable<Void>() {
+                @Override
+                public Void call() {
+                  try {
+                    wap.w.close();
+                  } catch (IOException ioe) {
+                    LOG.warn("Failed to close recovered edits writer " + wap.p, 
+                        ioe);
+                  }
+                  LOG.debug("Closed " + wap.p);
+                  return null;
+                }
+              });
+          closeResults.add(closeResult);
+        } catch (RejectedExecutionException ree) {
+          LOG.warn("Could not close writer " + wap.p + " due to thread pool " +
+              "shutting down.", ree);
+        }
+      }
+      try {
+        for (Future<Void> closeResult : closeResults) {
+          // Uncaught unchecked exception from the threads performing close
+          // should be propagated into the main thread.
+          closeResult.get();
         }
-        LOG.debug("Closed " + wap.p);
+      } catch (ExecutionException ee) {
+        LOG.error("Unexpected exception while closing a log writer", ee);
+        throw new IOException("Unexpected exception while closing", ee);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        LOG.error("Interrupted while closing a log writer", ie);
+        throw new InterruptedIOException("Interrupted while closing " + ie);
       }
       try {
         in.close();

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1356925&r1=1356924&r2=1356925&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Tue Jul  3 20:42:27 2012
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.After;
@@ -78,7 +81,8 @@ public class TestHLogSplit {
 
   private static final int NUM_WRITERS = 10;
   private static final int ENTRIES = 10; // entries per writer per region
-
+  private static final int NUM_CLOSE_THREADS = 10;
+  
   private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
   private long seq = 0;
   private static final byte[] TABLE_NAME = "t1".getBytes();
@@ -91,6 +95,9 @@ public class TestHLogSplit {
   private static final Path tabledir = new Path(hbaseDir,
       Bytes.toString(TABLE_NAME));
 
+  private static final ExecutorService logCloseThreadPool =
+      Executors.newFixedThreadPool(NUM_CLOSE_THREADS,
+          new DaemonThreadFactory("split-logClose-thread-"));
 
   static enum Corruptions {
     INSERT_GARBAGE_ON_FIRST_LINE,
@@ -718,7 +725,7 @@ public class TestHLogSplit {
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter);
+        reporter, logCloseThreadPool);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
 
@@ -744,7 +751,7 @@ public class TestHLogSplit {
     fs.delete(regiondir, true);
 
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter);
+        reporter, logCloseThreadPool);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
     // This test passes if there are no exceptions when
@@ -762,7 +769,7 @@ public class TestHLogSplit {
     fs.initialize(fs.getUri(), conf);
 
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter);
+        reporter, logCloseThreadPool);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
     Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
@@ -780,7 +787,7 @@ public class TestHLogSplit {
     fs.initialize(fs.getUri(), conf);
 
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter);
+        reporter, logCloseThreadPool);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
     for (String region : regions) {
@@ -800,7 +807,7 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter);
+        reporter, logCloseThreadPool);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
 



Mime
View raw message