hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r775132 - in /hadoop/hbase/trunk: CHANGES.txt src/java/org/apache/hadoop/hbase/HConstants.java src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Date Fri, 15 May 2009 13:45:12 GMT
Author: jdcryans
Date: Fri May 15 13:45:11 2009
New Revision: 775132

URL: http://svn.apache.org/viewvc?rev=775132&view=rev
Log:
HBASE-1008  [performance] The replay of logs on server crash takes way too long

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=775132&r1=775131&r2=775132&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri May 15 13:45:11 2009
@@ -248,6 +248,7 @@
    HBASE-1417  Cleanup disorientating RPC message
    HBASE-1424  have shell print regioninfo and location on first load if
                DEBUG enabled
+   HBASE-1008  [performance] The replay of logs on server crash takes way too long
 
   OPTIMIZATIONS
    HBASE-1412  Change values for delete column and column family in KeyValue

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java?rev=775132&r1=775131&r2=775132&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java Fri May 15 13:45:11
2009
@@ -133,6 +133,10 @@
   /** Default size of a reservation block   */
   static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5;
   
+  /** Default number of threads to use when log splitting 
+   *  to rewrite the logs. More means faster but bigger mem consumption */
+  static final int DEFAULT_NUMBER_LOG_WRITER_THREAD = 10;
+  
   // Always store the location of the root table's HRegion.
   // This HRegion is never split.
   

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775132&r1=775131&r2=775132&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Fri May 15
13:45:11 2009
@@ -24,12 +24,16 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -715,6 +719,7 @@
   public static void splitLog(final Path rootDir, final Path srcDir,
       final FileSystem fs, final Configuration conf)
   throws IOException {
+    long millis = System.currentTimeMillis();
     if (!fs.exists(srcDir)) {
       // Nothing to do
       return;
@@ -735,7 +740,9 @@
       io.initCause(e);
       throw io;
     }
-    LOG.info("log file splitting completed for " + srcDir.toString());
+    long endMillis = System.currentTimeMillis();
+    LOG.info("log file splitting completed in " + (endMillis - millis) +
+        " millis for " + srcDir.toString());
   }
   
   /*
@@ -748,8 +755,10 @@
   private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
     final FileSystem fs, final Configuration conf)
   throws IOException {
-    Map<byte [], SequenceFile.Writer> logWriters =
+    final Map<byte [], SequenceFile.Writer> logWriters =
       new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
+    final Map<byte[], LinkedList<HLogEntry>> logEntries = 
+      new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
     try {
       for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
@@ -762,78 +771,29 @@
         long length = logfiles[i].getLen();
         HLogKey key = new HLogKey();
         KeyValue val = new KeyValue();
+        SequenceFile.Reader in = null;
         try {
-          SequenceFile.Reader in =
-            new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
+          in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
           try {
             int count = 0;
-            for (; in.next(key, val); count++) {
-              byte [] tableName = key.getTablename();
-              byte [] regionName = key.getRegionName();
-              SequenceFile.Writer w = logWriters.get(regionName);
-              if (w == null) {
-                Path logfile = new Path(
-                    HRegion.getRegionDir(
-                        HTableDescriptor.getTableDir(rootDir, tableName),
-                        HRegionInfo.encodeRegionName(regionName)),
-                        HREGION_OLDLOGFILE_NAME);
-                Path oldlogfile = null;
-                SequenceFile.Reader old = null;
-                if (fs.exists(logfile)) {
-                  LOG.warn("Old log file " + logfile +
-                  " already exists. Copying existing file to new file");
-                  oldlogfile = new Path(logfile.toString() + ".old");
-                  fs.rename(logfile, oldlogfile);
-                  old = new SequenceFile.Reader(fs, oldlogfile, conf);
-                }
-                w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
-                  KeyValue.class, getCompressionType(conf));
-                // Use copy of regionName; regionName object is reused inside in
-                // HStoreKey.getRegionName so its content changes as we iterate.
-                logWriters.put(regionName, w);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Creating new log file writer for path " + logfile +
-                      " and region " + Bytes.toString(regionName));
-                }
-
-                if (old != null) {
-                  // Copy from existing log file
-                  HLogKey oldkey = new HLogKey();
-                  KeyValue oldval = new KeyValue();
-                  for (; old.next(oldkey, oldval); count++) {
-                    if (LOG.isDebugEnabled() && count > 0 && count % 10000
== 0) {
-                      LOG.debug("Copied " + count + " edits");
-                    }
-                    w.append(oldkey, oldval);
-                  }
-                  old.close();
-                  fs.delete(oldlogfile, true);
-                }
+            while (in.next(key, val)) {
+              byte[] regionName = key.getRegionName();
+              LinkedList<HLogEntry> queue = logEntries.get(regionName);
+              if (queue == null) {
+                queue = new LinkedList<HLogEntry>();
+                LOG.debug("Adding queue for " + Bytes.toString(regionName));
+                logEntries.put(regionName, queue);
               }
-              w.append(key, val);
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Applied " + count + " total edits from " +
-                  logfiles[i].getPath().toString());
+              queue.push(new HLogEntry(val, key));
+              count++;
             }
+            LOG.debug("Pushed " + count + " entries");
           } catch (IOException e) {
             e = RemoteExceptionHandler.checkIOException(e);
             if (!(e instanceof EOFException)) {
               LOG.warn("Exception processing " + logfiles[i].getPath() +
                   " -- continuing. Possible DATA LOSS!", e);
             }
-          } finally {
-            try {
-              in.close();
-            } catch (IOException e) {
-              LOG.warn("Close in finally threw exception -- continuing", e);
-            }
-            // Delete the input file now so we do not replay edits.  We could
-            // have gotten here because of an exception.  If so, probably
-            // nothing we can do about it. Replaying it, it could work but we
-            // could be stuck replaying for ever. Just continue though we
-            // could have lost some edits.
-            fs.delete(logfiles[i].getPath(), true);
           }
         } catch (IOException e) {
           if (length <= 0) {
@@ -841,14 +801,143 @@
             continue;
           }
           throw e;
+        } finally {
+          try {
+            if (in != null) {
+              in.close();
+            }
+          } catch (IOException e) {
+            LOG.warn("Close in finally threw exception -- continuing", e);
+          }
+          // Delete the input file now so we do not replay edits. We could
+          // have gotten here because of an exception. If so, probably
+          // nothing we can do about it. Replaying it, it could work but we
+          // could be stuck replaying for ever. Just continue though we
+          // could have lost some edits.
+          fs.delete(logfiles[i].getPath(), true);
         }
       }
+      ExecutorService threadPool = 
+        Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
+      for (final byte[] key : logEntries.keySet()) {
+
+        Thread thread = new Thread(Bytes.toString(key)) {
+          public void run() {
+            LinkedList<HLogEntry> entries = logEntries.get(key);
+            LOG.debug("Thread got " + entries.size() + " to process");
+            long threadTime = System.currentTimeMillis();
+            try {
+              int count = 0;
+              for (HLogEntry logEntry : entries) {
+                SequenceFile.Writer w = logWriters.get(key);
+                if (w == null) {
+                  Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+                      .getTableDir(rootDir, logEntry.getKey().getTablename()),
+                      HRegionInfo.encodeRegionName(key)),
+                      HREGION_OLDLOGFILE_NAME);
+                  Path oldlogfile = null;
+                  SequenceFile.Reader old = null;
+                  if (fs.exists(logfile)) {
+                    LOG.warn("Old log file " + logfile
+                        + " already exists. Copying existing file to new file");
+                    oldlogfile = new Path(logfile.toString() + ".old");
+                    fs.rename(logfile, oldlogfile);
+                    old = new SequenceFile.Reader(fs, oldlogfile, conf);
+                  }
+                  w = SequenceFile.createWriter(fs, conf, logfile,
+                      HLogKey.class, KeyValue.class, getCompressionType(conf));
+                  // Use copy of regionName; regionName object is reused inside
+                  // in
+                  // HStoreKey.getRegionName so its content changes as we
+                  // iterate.
+                  logWriters.put(key, w);
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Creating new log file writer for path "
+                        + logfile + " and region " + Bytes.toString(key));
+                  }
+
+                  if (old != null) {
+                    // Copy from existing log file
+                    HLogKey oldkey = new HLogKey();
+                    KeyValue oldval = new KeyValue();
+                    for (; old.next(oldkey, oldval); count++) {
+                      if (LOG.isDebugEnabled() && count > 0
+                          && count % 10000 == 0) {
+                        LOG.debug("Copied " + count + " edits");
+                      }
+                      w.append(oldkey, oldval);
+                    }
+                    old.close();
+                    fs.delete(oldlogfile, true);
+                  }
+                }
+                w.append(logEntry.getKey(), logEntry.getEdit());
+                count++;
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Applied " + count + " total edits to "
+                    + Bytes.toString(key) + " in "
+                    + (System.currentTimeMillis() - threadTime) + "ms");
+              }
+            } catch (IOException e) {
+              e = RemoteExceptionHandler.checkIOException(e);
+              LOG.warn("Got while writing region " + Bytes.toString(key)
+                  + " log " + e);
+              e.printStackTrace();
+            }
+          }
+        };
+        threadPool.execute(thread);
+      }
+      threadPool.shutdown();
+      // Wait for all threads to terminate
+      try {
+        for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
+          LOG.debug("Waiting for log writers to terminate, iteration #" + i);
+        }
+      }catch(InterruptedException ex) {
+        LOG.warn("Log writers were interrupted, possible data loss!");
+      }
     } finally {
       for (SequenceFile.Writer w : logWriters.values()) {
         w.close();
       }
     }
   }
+  
+  /**
+   * Utility class that lets us keep track of the edit with it's key
+   * Only used when splitting logs
+   */
+  public static class HLogEntry {
+    private KeyValue edit;
+    private HLogKey key;
+    /**
+     * Constructor for both params
+     * @param edit log's edit
+     * @param key log's key
+     */
+    public HLogEntry(KeyValue edit, HLogKey key) {
+      super();
+      this.edit = edit;
+      this.key = key;
+    }
+    /**
+     * Gets the edit
+     * @return edit
+     */
+    public KeyValue getEdit() {
+      return edit;
+    }
+    /**
+     * Gets the key
+     * @return key
+     */
+    public HLogKey getKey() {
+      return key;
+    }
+
+  }
 
   /**
    * Construct the HLog directory name



Mime
View raw message