hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r707247 - in /hadoop/hbase/trunk: ./ conf/ src/java/org/apache/hadoop/hbase/regionserver/
Date Thu, 23 Oct 2008 02:30:36 GMT
Author: jimk
Date: Wed Oct 22 19:30:35 2008
New Revision: 707247

URL: http://svn.apache.org/viewvc?rev=707247&view=rev
Log:
HBASE-728   Support for HLog appends

- Passes all unit tests.
- Runs Performance evaluation random write in 8min, 43sec on 4 node cluster. I believe this
is a new speed record.
- Eliminates time-based log rolling and cache-flushing (because of append support)

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/conf/hbase-default.xml
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=707247&r1=707246&r2=707247&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Oct 22 19:30:35 2008
@@ -39,6 +39,7 @@
    HBASE-945   Be consistent in use of qualified/unqualified mapfile paths
    HBASE-946   Row with 55k deletes timesout scanner lease
    HBASE-950   HTable.commit no longer works with existing RowLocks though it's still in
API
+   HBASE-728   Support for HLog appends
 
   IMPROVEMENTS
    HBASE-901   Add a limit to key length, check key and value length on client side

Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=707247&r1=707246&r2=707247&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Wed Oct 22 19:30:35 2008
@@ -156,30 +156,25 @@
   </property>
   <property>
     <name>hbase.regionserver.maxlogentries</name>
-    <value>30000</value>
+    <value>100000</value>
     <description>Rotate the HRegion HLogs when count of entries exceeds this
-    value.  Default: 30,000.  Value is checked by a thread that runs every
+    value.  Default: 100,000.  Value is checked by a thread that runs every
     hbase.server.thread.wakefrequency.
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.optionalcacheflushinterval</name>
-    <value>1800000</value>
-    <description>
-    Amount of time to wait since the last time a region was flushed before
-    invoking an optional cache flush (An optional cache flush is a
-    flush even though memcache is not at the memcache.flush.size).
-    Default: 30 minutes (in miliseconds)
+    <name>hbase.regionserver.flushlogentries</name>
+    <value>100</value>
+    <description>Sync the HLog to the HDFS when it has accumulated this many
+    entries. Default 100. Value is checked on every HLog.append
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.optionallogrollinterval</name>
-    <value>1800000</value>
-    <description>
-    Amount of time to wait since the last time a the region server's log was
-    rolled before invoking an optional log roll (An optional log roll is a
-    one in which the log does not contain hbase.regionserver.maxlogentries).
-    Default: 30 minutes (in miliseconds)
+    <name>hbase.regionserver.optionallogflushinterval</name>
+    <value>10000</value>
+    <description>Sync the HLog to the HDFS after this interval if it has not
+    accumulated enough entries to trigger a sync. Default 10 seconds. Units:
+    milliseconds.
     </description>
   </property>
   <property>

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java?rev=707247&r1=707246&r2=707247&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java Wed Oct
22 19:30:35 2008
@@ -25,7 +25,6 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.TimeUnit;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.ConcurrentModificationException;
 
@@ -48,10 +47,8 @@
   private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
 
   private final long threadWakeFrequency;
-  private final long optionalFlushPeriod;
   private final HRegionServer server;
   private final ReentrantLock lock = new ReentrantLock();
-  private long lastOptionalCheck = System.currentTimeMillis();
 
   protected final long globalMemcacheLimit;
   protected final long globalMemcacheLimitLowMark;
@@ -63,8 +60,6 @@
   public Flusher(final HBaseConfiguration conf, final HRegionServer server) {
     super();
     this.server = server;
-    optionalFlushPeriod = conf.getLong(
-        "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
     threadWakeFrequency = conf.getLong(
         HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
         
@@ -82,7 +77,6 @@
     while (!server.isStopRequested()) {
       HRegion r = null;
       try {
-        enqueueOptionalFlushRegions();
         r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
         if (r == null) {
           continue;
@@ -109,15 +103,23 @@
   }
   
   public void request(HRegion r) {
-    addRegion(r, System.currentTimeMillis());
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.contains(r)) {
+        regionsInQueue.add(r);
+        flushQueue.add(r);
+      }
+    }
   }
   
   /**
    * Only interrupt once it's done with a run through the work loop.
    */ 
   void interruptIfNecessary() {
-    if (lock.tryLock()) {
+    lock.lock();
+    try {
       this.interrupt();
+    } finally {
+      lock.unlock();
     }
   }
   
@@ -196,52 +198,6 @@
   }
   
   /**
-   * Find the regions that should be optionally flushed and put them on the
-   * flush queue.
-   */
-  private void enqueueOptionalFlushRegions() {
-    long now = System.currentTimeMillis();
-    if (now - threadWakeFrequency > lastOptionalCheck) {
-      lastOptionalCheck = now;
-      // Queue up regions for optional flush if they need it
-      Set<HRegion> regions = server.getRegionsToCheck();
-      for (HRegion region: regions) {
-        optionallyAddRegion(region, now);
-      }
-    }
-  }
-
-  /*
-   * Add region if not already added and if optional flush period has been
-   * exceeded.
-   * @param r Region to add.
-   * @param now The 'now' to use.  Set last flush time to this value.
-   */
-  private void optionallyAddRegion(final HRegion r, final long now) {
-    synchronized (regionsInQueue) {
-      if (!regionsInQueue.contains(r) &&
-          (now - optionalFlushPeriod) > r.getLastFlushTime()) {
-        addRegion(r, now);
-      }
-    }
-  }
-  
-  /*
-   * Add region if not already added.
-   * @param r Region to add.
-   * @param now The 'now' to use.  Set last flush time to this value.
-   */
-  private void addRegion(final HRegion r,
-      @SuppressWarnings("unused") final long now) {
-    synchronized (regionsInQueue) {
-      if (!regionsInQueue.contains(r)) {
-        regionsInQueue.add(r);
-        flushQueue.add(r);
-      }
-    }
-  }
-
-  /**
    * Check if the regionserver's memcache memory usage is greater than the 
    * limit. If so, flush regions with the biggest memcaches until we're down
    * to the lower limit. This method blocks callers until we're down to a safe

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=707247&r1=707246&r2=707247&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Wed Oct 22
19:30:35 2008
@@ -36,6 +36,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -82,16 +83,8 @@
  * rolling is not. To prevent log rolling taking place during this period, a
  * separate reentrant lock is used.
  *
- * <p>
- * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
- * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
- * 'atomic' write to the log is meant to serve as insurance against abnormal
- * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
- * last wholesome state. But files in HDFS do not 'exist' until they are cleanly
- * closed -- something that will not happen if RegionServer exits without
- * running its 'close'.
  */
-public class HLog implements HConstants {
+public class HLog extends Thread implements HConstants, Syncable {
   private static final Log LOG = LogFactory.getLog(HLog.class);
   private static final String HLOG_DATFILE = "hlog.dat.";
   static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
@@ -100,8 +93,12 @@
   final Path dir;
   final Configuration conf;
   final LogRollListener listener;
-  final long threadWakeFrequency;
   private final int maxlogentries;
+  private final long optionalFlushInterval;
+  private final int flushlogentries;
+  private volatile int unflushedEntries = 0;
+  private volatile long lastLogFlushTime;
+  final long threadWakeFrequency;
 
   /*
    * Current log file.
@@ -153,13 +150,22 @@
    */
   public HLog(final FileSystem fs, final Path dir, final Configuration conf,
       final LogRollListener listener) throws IOException {
+    
+    super();
+    
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
     this.listener = listener;
-    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.setName(this.getClass().getSimpleName());
     this.maxlogentries =
-      conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
+      conf.getInt("hbase.regionserver.maxlogentries", 100000);
+    this.flushlogentries =
+      conf.getInt("hbase.regionserver.flushlogentries", 100);
+    this.optionalFlushInterval =
+      conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
+    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.lastLogFlushTime = System.currentTimeMillis();
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -168,7 +174,7 @@
   }
 
   /*
-   * Accessor for tests.
+   * Accessor for tests. Not a part of the public API.
    * @return Current state of the monotonically increasing file id.
    */
   public long getFilenum() {
@@ -313,6 +319,7 @@
           }
         }
         this.numEntries = 0;
+        updateLock.notifyAll();
       }
     } finally {
       this.cacheFlushLock.unlock();
@@ -354,11 +361,15 @@
     cacheFlushLock.lock();
     try {
       synchronized (updateLock) {
+        this.closed = true;
+        if (this.isAlive()) {
+          this.interrupt();
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("closing log writer in " + this.dir.toString());
         }
         this.writer.close();
-        this.closed = true;
+        updateLock.notifyAll();
       }
     } finally {
       cacheFlushLock.unlock();
@@ -415,11 +426,40 @@
 
         this.numEntries++;
       }
+      updateLock.notifyAll();
     }
     if (this.numEntries > this.maxlogentries) {
         requestLogRoll();
     }
   }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void run() {
+    while (!this.closed) {
+      synchronized (updateLock) {
+        if (((System.currentTimeMillis() - this.optionalFlushInterval) >
+              this.lastLogFlushTime) && this.unflushedEntries > 0) {
+          try {
+            sync();
+          } catch (IOException e) {
+            LOG.error("Error flushing HLog", e);
+          }
+        }
+        try {
+          updateLock.wait(this.threadWakeFrequency);
+        } catch (InterruptedException e) {
+          // continue
+        }
+      }
+    }
+  }
+  
+  public void sync() throws IOException {
+    lastLogFlushTime = System.currentTimeMillis();
+    this.writer.sync();
+    unflushedEntries = 0;
+  }
 
   private void requestLogRoll() {
     if (this.listener != null) {
@@ -430,6 +470,9 @@
   private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException {
     try {
       this.writer.append(logKey, logEdit);
+      if (++unflushedEntries >= flushlogentries) {
+        sync();
+      }
     } catch (IOException e) {
       LOG.fatal("Could not append. Requesting close of log", e);
       requestLogRoll();
@@ -454,7 +497,8 @@
    * @param logEdit
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException
{
+  public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit)
+  throws IOException {
     if (closed) {
       throw new IOException("Cannot append; log is closed");
     }
@@ -474,6 +518,7 @@
       HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
       doWrite(logKey, logEdit);
       this.numEntries++;
+      updateLock.notifyAll();
     }
 
     if (this.numEntries > this.maxlogentries) {
@@ -563,6 +608,7 @@
         if (seq != null && logSeqId >= seq.longValue()) {
           this.lastSeqWritten.remove(regionName);
         }
+        updateLock.notifyAll();
       }
     } finally {
       this.cacheFlushLock.unlock();

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=707247&r1=707246&r2=707247&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed
Oct 22 19:30:35 2008
@@ -561,7 +561,9 @@
         "running at " + this.serverInfo.getServerAddress().toString() +
         " because logdir " + logdir.toString() + " exists");
     }
-    return new HLog(fs, logdir, conf, logRoller);
+    HLog newlog = new HLog(fs, logdir, conf, logRoller);
+    newlog.start();
+    return newlog;
   }
   
   /*

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=707247&r1=707246&r2=707247&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Wed Oct
22 19:30:35 2008
@@ -25,57 +25,38 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 
 /** Runs periodically to determine if the HLog should be rolled */
 class LogRoller extends Thread implements LogRollListener {
   static final Log LOG = LogFactory.getLog(LogRoller.class);  
   private final ReentrantLock rollLock = new ReentrantLock();
-  private final long optionalLogRollInterval;
-  private long lastLogRollTime;
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
   private final HRegionServer server;
-  private final HBaseConfiguration conf;
   
   /** @param server */
   public LogRoller(final HRegionServer server) {
     super();
     this.server = server;
-    conf = server.conf;
-    this.optionalLogRollInterval = conf.getLong(
-      "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
-    lastLogRollTime = System.currentTimeMillis();
   }
 
   @Override
   public void run() {
     while (!server.isStopRequested()) {
-      while (!rollLog.get() && !server.isStopRequested()) {
-        long now = System.currentTimeMillis();
-        if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
-          rollLog.set(true);
-          this.lastLogRollTime = now;
-        } else {
-          synchronized (rollLog) {
-            try {
-              rollLog.wait(server.threadWakeFrequency);
-            } catch (InterruptedException e) {
-              continue;
-            }
+      if (!rollLog.get()) {
+        synchronized (rollLog) {
+          try {
+            rollLog.wait(server.threadWakeFrequency);
+          } catch (InterruptedException e) {
+            continue;
           }
         }
-      }
-      if (!rollLog.get()) {
-        // There's only two reasons to break out of the while loop.
-        // 1. Log roll requested
-        // 2. Stop requested
-        // so if a log roll was not requested, continue and break out of loop
         continue;
       }
       rollLock.lock();          // Don't interrupt us. We're working
       try {
-        LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries());
+        LOG.info("Rolling hlog. Number of entries: " +
+            server.getLog().getNumEntries());
         server.getLog().rollWriter();
       } catch (FailedLogCloseException e) {
         LOG.fatal("Forcing server shutdown", e);
@@ -107,8 +88,11 @@
    * It is sleeping if rollLock is not held.
    */
   public void interruptIfNecessary() {
-    if (rollLock.tryLock()) {
+    try {
+      rollLock.lock();
       this.interrupt();
+    } finally {
+      rollLock.unlock();
     }
   }
 }
\ No newline at end of file



Mime
View raw message