hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1407903 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Date Sun, 11 Nov 2012 00:01:41 GMT
Author: liyin
Date: Sun Nov 11 00:01:39 2012
New Revision: 1407903

URL: http://svn.apache.org/viewvc?rev=1407903&view=rev
Log:
[HBASE-6968 ] Improve HLog group commits throughput - part II

Author: liyintang

Summary:
Since there is a code refactor in D599724 (https://phabricator.fb.com/D599724), it may caused
some confusion or conflicts in D558495 (https://phabricator.fb.com/D558495). So let's address
the concern in the diff:

1) Removed the append/sync logic in completeCacheFlush()
2) Only hflush function in the logSyncer thread will append/sync the wal and it is protected
by the updateLock as before.
3) Remove another unnecessary append function in HLog.java
4) Refactor the append/sync metrics code

Test Plan: Not tested yet.

Reviewers: kannan, kranganathan

Reviewed By: kannan

CC: hkuang, hbase-eng@

Differential Revision: https://phabricator.fb.com/D623227

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1407903&r1=1407902&r2=1407903&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sun Nov 11 00:01:39 2012
@@ -3831,7 +3831,6 @@ public class HRegion implements HeapSize
    */
   private List<KeyValue> get(final Get get) throws IOException {
     long now = EnvironmentEdgeManager.currentTimeMillis();
-    readRequests.incrTotalRequstCount();
     Scan scan = new Scan(get);
 
     List<KeyValue> results = new ArrayList<KeyValue>();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1407903&r1=1407902&r2=1407903&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Sun Nov 11 00:01:39 2012
@@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
@@ -269,16 +268,6 @@ public class HLog implements Syncable {
   private static final FileStatus[] NO_FILES = new FileStatus[0];
   protected static final byte[] DUMMY = Bytes.toBytes("");
 
-  static byte [] COMPLETE_CACHE_FLUSH;
-  static {
-    try {
-      COMPLETE_CACHE_FLUSH =
-        "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
-    } catch (UnsupportedEncodingException e) {
-      assert(false);
-    }
-  }
-
   public static class Metric {
     public long min = Long.MAX_VALUE;
     public long max = 0;
@@ -356,7 +345,7 @@ public class HLog implements Syncable {
      * 
      * @return number of log entries synced
      */
-    private int sync() throws IOException {
+    private int appendAndSync() throws IOException {
       synchronized (this) {
         if (currentList.isEmpty()) { // no thing to sync
           return 0;
@@ -373,7 +362,7 @@ public class HLog implements Syncable {
       int syncedEntries = syncList.size();
       while (!syncList.isEmpty()) {
         Entry entry = syncList.remove();
-        append(entry);
+        writer.append(entry);
       }
       
       // sync the data
@@ -939,14 +928,15 @@ public class HLog implements Syncable {
     } catch (InterruptedException e) {
       LOG.error("Exception while waiting for syncer thread to die", e);
     }
-
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("closing hlog writer in " + this.dir.toString());
+    }
+    
     cacheFlushLock.writeLock().lock();
     try {
       synchronized (updateLock) {
         this.closed = true;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("closing hlog writer in " + this.dir.toString());
-        }
         cleanupCurrentWriter(-1);
       }
     } finally {
@@ -964,43 +954,6 @@ public class HLog implements Syncable {
     return new HLogKey(regionName, tableName, seqnum, now);
   }
 
-
-  /** Append an entry to the log.
-   *
-   * @param regionInfo
-   * @param logEdit
-   * @param logKey
-   * @throws IOException
-   */
-  public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
-  throws IOException {
-    if (logSyncerThread.syncerShuttingDown) {
-      // can't acquire lock for the duration of append()
-      // so this is just a best-effort check
-      throw new IOException("Cannot append; logSyncer shutting down");
-    }
-    byte [] regionName = regionInfo.getRegionName();
-    synchronized (this.appendLock) {
-      if (this.closed) {
-        throw new IOException("Cannot append; log is closed");
-      }
-      long seqNum = obtainSeqNum();
-      logKey.setLogSeqNum(seqNum);
-      // The 'lastSeqWritten' map holds the sequence number of the oldest
-      // write for each region (i.e. the first edit added to the particular
-      // memstore). When the cache is flushed, the entry for the
-      // region being flushed is removed if the sequence number of the flush
-      // is greater than or equal to the value in lastSeqWritten.
-      this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
-      doWrite(regionInfo, logKey, logEdit);
-      this.unflushedEntries.incrementAndGet();
-      this.numEntries.incrementAndGet();
-    }
-
-    // sync txn to file system
-    this.sync(regionInfo.isMetaRegion());
-  }
-
   /**
    * Append a set of edits to the log. Log edits are keyed by regionName,
    * rowname, and log-sequence-id.
@@ -1025,8 +978,7 @@ public class HLog implements Syncable {
    * @throws IOException
    */
   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
-    final long now)
-  throws IOException {
+    final long now) throws IOException {
     if (!this.enabled || edits.isEmpty()) {
       return;
     }
@@ -1041,16 +993,14 @@ public class HLog implements Syncable {
     
     long start = System.currentTimeMillis();
     byte[] regionName = info.getRegionName();
+
     synchronized (this.appendLock) {
-      if (this.closed) {
-        throw new IOException("Cannot append; log is closed");
-      }
-      long seqNum = obtainSeqNum();
       // The 'lastSeqWritten' map holds the sequence number of the oldest
       // write for each region (i.e. the first edit added to the particular
       // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
+      long seqNum = obtainSeqNum();
       this.lastSeqWritten.putIfAbsent(regionName, seqNum);
       HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
       
@@ -1058,42 +1008,39 @@ public class HLog implements Syncable {
       // Only count 1 row as an unflushed entry.
       txid = this.unflushedEntries.incrementAndGet();
     }
-    long time = System.currentTimeMillis() - start;
     
-    // Update the metrics and log the outliers
+    // Update the metrics 
     this.numEntries.incrementAndGet();
     writeSize.inc(len);
-    writeTime.inc(time);
-    if (time > 1000) {
+
+    // sync txn to file system
+    start = System.currentTimeMillis();
+    this.sync(info.isMetaRegion(), txid);
+    
+    // Update the metrics and log down the outliers
+    long end = System.currentTimeMillis();
+    long syncTime = end - start;
+    gsyncTime.inc(syncTime);
+    if (syncTime > 1000) {
       LOG.warn(String.format(
         "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
-        Thread.currentThread().getName(), time, this.numEntries.get(),
+        Thread.currentThread().getName(), syncTime, this.numEntries.get(),
         StringUtils.humanReadableInt(len)));
     }
     
+    // Update the per-request profiling data
     Call call = HRegionServer.callContext.get();
     ProfilingData pData = call == null ? null : call.getProfilingData();
     if (pData != null) {
-      pData.addLong(ProfilingData.HLOG_WRITE_TIME_MS, time);
-    }
-
-    // sync txn to file system
-    start = System.currentTimeMillis();
-    this.sync(info.isMetaRegion(), txid);
-    long end = System.currentTimeMillis();
-    time = end - start;
-    
-    gsyncTime.inc(time);
-    if (pData != null) {
       if (this.lastLogRollStartTimeMillis > start
           && end > this.lastLogRollStartTimeMillis) {
         // We also had a log roll in between
         pData.addLong(ProfilingData.HLOG_ROLL_TIME_MS, this.lastLogRollDurationMillis);
         // Do not account for this as the sync time.
-        time = time - this.lastLogRollDurationMillis;
+        syncTime = syncTime - this.lastLogRollDurationMillis;
       }
       // update sync time
-      pData.addLong(ProfilingData.HLOG_SYNC_TIME_MS, time);
+      pData.addLong(ProfilingData.HLOG_SYNC_TIME_MS, syncTime);
     }
   }
 
@@ -1130,7 +1077,6 @@ public class HLog implements Syncable {
     public void run() {
       try {
         long lastHFlushAt = EnvironmentEdgeManager.currentTimeMillis();
-        boolean isQueueEmpty;
         lock.lock();
         // awaiting with a timeout doesn't always
         // throw exceptions on interrupt
@@ -1185,7 +1131,6 @@ public class HLog implements Syncable {
      * and then waits for it to happen before returning.
      */
     public void addToSyncQueue(boolean force, long txid) {
-
       while (true) {
         // Don't bother if somehow our append was already hflushed
         // Check this without even acquiring the lock, in the hope
@@ -1244,36 +1189,35 @@ public class HLog implements Syncable {
       if (this.closed) {
         return;
       }
-      boolean logRollRequested = false;
+      boolean isUnderReplication = false;
       if (this.forceSync ||
           this.unflushedEntries.get() - this.syncTillHere >= this.flushlogentries) {
         try {
-          this.syncTillHere += this.logBuffer.sync();
-
-          // if the number of replicas in HDFS has fallen below the initial
-          // value, then roll logs.
-          try {
-            int numCurrentReplicas = getLogReplication();
-            if (numCurrentReplicas != 0 &&
-                numCurrentReplicas < this.initialReplication) {
-              LOG.warn("HDFS pipeline error detected. " +
-                  "Found " + numCurrentReplicas + " replicas but expecting " +
-                  this.initialReplication + " replicas. " +
-                  " Requesting close of hlog.");
-              requestLogRoll();
-              logRollRequested = true;
-            }
-          } catch (Exception e) {
-              LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
-                       " still proceeding ahead...");
-          }
+          this.syncTillHere += this.logBuffer.appendAndSync();
         } catch (IOException e) {
           syncFailureAbortStrategy.abort("Could not sync hlog. Aborting", e);
         }
       }
-
+      
+      // if the number of replicas in HDFS has fallen below the initial
+      // value, then roll logs.
       try {
-        if (!logRollRequested && (this.writer.getLength() > this.logrollsize))
{
+        int numCurrentReplicas = getLogReplication();
+        if (numCurrentReplicas != 0 &&
+            numCurrentReplicas < this.initialReplication) {
+          LOG.warn("HDFS pipeline error detected. " +
+              "Found " + numCurrentReplicas + " replicas but expecting " +
+              this.initialReplication + " replicas. " +
+              " Requesting close of hlog.");
+          isUnderReplication = true;
+        }
+      } catch (Exception e) {
+          LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
+                   " still proceeding ahead...");
+      }
+      
+      try {
+        if (isUnderReplication || (this.writer.getLength() > this.logrollsize)) {
           requestLogRoll();
         }
       } catch (IOException ioe) {
@@ -1281,7 +1225,7 @@ public class HLog implements Syncable {
       }
     }
   }
-
+  
   /**
    * This method gets the datanode replication count for the current HLog.
    *
@@ -1316,34 +1260,6 @@ public class HLog implements Syncable {
     this.logBuffer.appendToBuffer(new Entry(logKey, logEdit));
   }
   
-  /**
-   * Append a log entry into the writer
-   * @param entry
-   * @throws IOException
-   */
-  private void append(Entry entry) throws IOException {
-    try {
-      long now = System.currentTimeMillis();
-      this.writer.append(entry);
-      long took = System.currentTimeMillis() - now;
-      long len = 0;
-      for(KeyValue kv : entry.edit.getKeyValues()) {
-        len += kv.getLength();
-      }
-      writeSize.inc(len);
-      if (took > 1000) {
-        LOG.warn(String.format(
-          "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
-          Thread.currentThread().getName(), took, this.numEntries.get(),
-          StringUtils.humanReadableInt(len)));
-      }
-    } catch (IOException e) {
-      LOG.fatal("Could not append. Requesting close of hlog", e);
-      requestLogRoll();
-      throw e;
-    }
-  }
-
   /** @return How many items have been added to the log */
   int getNumEntries() {
     return numEntries.get();
@@ -1422,41 +1338,12 @@ public class HLog implements Syncable {
    * @param logSeqId
    * @throws IOException
    */
-  public void completeCacheFlush(final byte [] regionName,
-      final byte [] tableName,
-      final long logSeqId, final boolean isMetaRegion)  {
-    try {
-      synchronized (updateLock) {
-        if (this.closed) {
-          return;
-        }
-        try {
-          WALEdit edit = completeCacheFlushLogEdit();
-          HLogKey key = makeKey(regionName, tableName, logSeqId,
-              System.currentTimeMillis());
-          this.writer.append(new Entry(key, edit));
-          this.numEntries.incrementAndGet();
-        } catch (IOException ioe) {
-          LOG.warn("Failed to write cache-flush-done entry. Ignoring.", ioe);
-        }
-      }
-      // sync txn to file system
-      this.sync(isMetaRegion);
-    } finally {
-      // updateLock not needed for removing snapshot's entry
-      // Cleaning up of lastSeqWritten is in the finally clause because we
-      // don't want to confuse getOldestOutstandingSeqNum()
-      this.lastSeqWritten.remove(regionName);
-      this.cacheFlushLock.readLock().unlock();
-    }
-  }
-
-  private WALEdit completeCacheFlushLogEdit() {
-    KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
-      System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
-    WALEdit e = new WALEdit();
-    e.add(kv);
-    return e;
+  public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
+      final long logSeqId, final boolean isMetaRegion) {
+    // Cleaning up of lastSeqWritten is in the finally clause because we
+    // don't want to confuse getOldestOutstandingSeqNum()
+    this.lastSeqWritten.remove(regionName);
+    this.cacheFlushLock.readLock().unlock();
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1407903&r1=1407902&r2=1407903&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Sun Nov 11 00:01:39 2012
@@ -183,7 +183,7 @@ public class TestHLog  {
    * @throws Exception
    */
   @Test
-  public void Broken_testSync() throws Exception {
+  public void testSync() throws Exception {
     byte [] bytes = Bytes.toBytes(getName());
     // First verify that using streams all works.
     Path p = new Path(dir, getName() + ".fsdos");
@@ -483,20 +483,6 @@ public class TestHLog  {
         System.out.println(key + " " + val);
         count++;
       }
-      HLog.Entry entry = null;
-      while ((entry = reader.next(null)) != null) {
-        HLogKey key = entry.getKey();
-        WALEdit val = entry.getEdit();
-        // Assert only one more row... the meta flushed row.
-        assertTrue(Bytes.equals(regionName, key.getRegionName()));
-        assertTrue(Bytes.equals(tableName, key.getTablename()));
-        KeyValue kv = val.getKeyValues().get(0);
-        assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
-        assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
-        assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
-          val.getKeyValues().get(0).getValue()));
-        System.out.println(key + " " + val);
-      }
     } finally {
       if (log != null) {
         log.closeAndDelete();
@@ -550,20 +536,6 @@ public class TestHLog  {
         System.out.println(entry.getKey() + " " + val);
         idx++;
       }
-
-      // Get next row... the meta flushed row.
-      entry = reader.next();
-      assertEquals(1, entry.getEdit().size());
-      for (KeyValue val : entry.getEdit().getKeyValues()) {
-        assertTrue(Bytes.equals(hri.getRegionName(),
-          entry.getKey().getRegionName()));
-        assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
-        assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
-        assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
-        assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
-          val.getValue()));
-        System.out.println(entry.getKey() + " " + val);
-      }
     } finally {
       if (log != null) {
         log.closeAndDelete();
@@ -607,14 +579,15 @@ public class TestHLog  {
 
     // Flush the first region, we expect to see the first two files getting
     // archived
+    addEdits(log, hri, tableName, 1);
     long seqId = log.startCacheFlush(hri.getRegionName());
     log.completeCacheFlush(hri.getRegionName(), tableName, seqId, false);
     log.rollWriter();
     assertEquals(2, log.getNumLogFiles());
 
     // Flush the second region, which removes all the remaining output files
-    // since the oldest was completely flushed and the two others only contain
-    // flush information
+    // since the oldest was completely flushed.
+    addEdits(log, hri2, tableName2, 1);
     seqId = log.startCacheFlush(hri2.getRegionName());
     log.completeCacheFlush(hri2.getRegionName(), tableName2, seqId, false);
     log.rollWriter();



Mime
View raw message