hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1550778 - in /hbase/trunk: hbase-common/src/main/resources/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Fri, 13 Dec 2013 17:32:10 GMT
Author: stack
Date: Fri Dec 13 17:32:09 2013
New Revision: 1550778

URL: http://svn.apache.org/r1550778
Log:
HBASE-8755 A new write thread model for HLog to improve the overall HBase write throughput

Modified:
    hbase/trunk/hbase-common/src/main/resources/hbase-default.xml
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java

Modified: hbase/trunk/hbase-common/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/resources/hbase-default.xml?rev=1550778&r1=1550777&r2=1550778&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/hbase-common/src/main/resources/hbase-default.xml Fri Dec 13 17:32:09 2013
@@ -204,12 +204,6 @@ possible configurations would overwhelm 
     in milliseconds.</description>
   </property>
   <property>
-    <name>hbase.regionserver.optionallogflushinterval</name>
-    <value>1000</value>
-    <description>Sync the HLog to the HDFS after this interval if it has not
-    accumulated enough entries to trigger a sync. Units: milliseconds.</description>
-  </property>
-  <property>
     <name>hbase.regionserver.regionSplitLimit</name>
     <value>2147483647</value>
     <description>Limit for the number of regions after which no more region

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1550778&r1=1550777&r2=1550778&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
Fri Dec 13 17:32:09 2013
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.util.Drai
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
@@ -118,15 +117,19 @@ class FSHLog implements HLog, Syncable {
   // Listeners that are called on WAL events.
   private List<WALActionsListener> listeners =
     new CopyOnWriteArrayList<WALActionsListener>();
-  private final long optionalFlushInterval;
   private final long blocksize;
   private final String prefix;
   private final AtomicLong unflushedEntries = new AtomicLong(0);
-  private volatile long syncedTillHere = 0;
+  private final AtomicLong syncedTillHere = new AtomicLong(0);
   private long lastDeferredTxid;
   private final Path oldLogDir;
   private volatile boolean logRollRunning;
 
+  // all writes pending on AsyncWriter/AsyncSyncer thread with
+  // txid <= failedTxid will fail by throwing asyncIOE
+  private final AtomicLong failedTxid = new AtomicLong(0);
+  private volatile IOException asyncIOE = null;
+
   private WALCoprocessorHost coprocessorHost;
 
   private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current
SequenceFile.writer
@@ -208,7 +211,7 @@ class FSHLog implements HLog, Syncable {
   // during an update
   // locked during appends
   private final Object updateLock = new Object();
-  private final Object flushLock = new Object();
+  private final Object pendingWritesLock = new Object();
 
   private final boolean enabled;
 
@@ -219,10 +222,20 @@ class FSHLog implements HLog, Syncable {
    */
   private final int maxLogs;
 
-  /**
-   * Thread that handles optional sync'ing
-   */
-  private final LogSyncer logSyncer;
+  // List of pending writes to the HLog. There corresponds to transactions
+  // that have not yet returned to the client. We keep them cached here
+  // instead of writing them to HDFS piecemeal. The goal is to increase
+  // the batchsize for writing-to-hdfs as well as sync-to-hdfs, so that
+  // we can get better system throughput.
+  private List<Entry> pendingWrites = new LinkedList<Entry>();
+
+  private final AsyncWriter   asyncWriter;
+  // since AsyncSyncer takes much longer than other phase(add WALEdits to local
+  // buffer, write local buffer to HDFS, notify pending write handler threads),
+  // when a sync is ongoing, all other phase pend, we use multiple parallel
+  // AsyncSyncer threads to improve overall throughput.
+  private final AsyncSyncer[] asyncSyncers;
+  private final AsyncNotifier asyncNotifier;
 
   /** Number of log close errors tolerated before we abort */
   private final int closeErrorsTolerated;
@@ -368,8 +381,6 @@ class FSHLog implements HLog, Syncable {
     // Roll at 95% of block size.
     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
     this.logrollsize = (long)(this.blocksize * multi);
-    this.optionalFlushInterval =
-      conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
 
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
     this.minTolerableReplication = conf.getInt(
@@ -381,13 +392,11 @@ class FSHLog implements HLog, Syncable {
     this.closeErrorsTolerated = conf.getInt(
         "hbase.regionserver.logroll.errors.tolerated", 0);
 
-    this.logSyncer = new LogSyncer(this.optionalFlushInterval);
 
     LOG.info("WAL/HLog configuration: blocksize=" +
       StringUtils.byteDesc(this.blocksize) +
       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
-      ", enabled=" + this.enabled +
-      ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
+      ", enabled=" + this.enabled);
     // If prefix is null||empty then just name it hlog
     this.prefix = prefix == null || prefix.isEmpty() ?
         "hlog" : URLEncoder.encode(prefix, "UTF8");
@@ -411,15 +420,22 @@ class FSHLog implements HLog, Syncable {
     // handle the reflection necessary to call getNumCurrentReplicas()
     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
 
-    // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
-    if (this.optionalFlushInterval > 0) {
-      Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
-          + ".logSyncer");
-    } else {
-      LOG.info("hbase.regionserver.optionallogflushinterval is set as "
-          + this.optionalFlushInterval + ". Deferred log syncing won't work. "
-          + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
+    final String n = Thread.currentThread().getName();
+
+
+    asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
+    asyncWriter.start();
+   
+    int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
+    asyncSyncers = new AsyncSyncer[syncerNums];
+    for (int i = 0; i < asyncSyncers.length; ++i) {
+      asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
+      asyncSyncers[i].start();
     }
+
+    asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
+    asyncNotifier.start();
+
     coprocessorHost = new WALCoprocessorHost(this, conf);
 
     this.metrics = new MetricsWAL();
@@ -735,11 +751,11 @@ class FSHLog implements HLog, Syncable {
       try {
         // Wait till all current transactions are written to the hlog.
         // No new transactions can occur because we have the updatelock.
-        if (this.unflushedEntries.get() != this.syncedTillHere) {
+        if (this.unflushedEntries.get() != this.syncedTillHere.get()) {
           LOG.debug("cleanupCurrentWriter " +
                    " waiting for transactions to get synced " +
                    " total " + this.unflushedEntries.get() +
-                   " synced till here " + syncedTillHere);
+                   " synced till here " + this.syncedTillHere.get());
           sync();
         }
         this.writer.close();
@@ -874,17 +890,33 @@ class FSHLog implements HLog, Syncable {
     if (this.closed) {
       return;
     }
-    // When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
-    if (this.optionalFlushInterval > 0) {
+
+    try {
+      asyncNotifier.interrupt();
+      asyncNotifier.join();
+    } catch (InterruptedException e) {
+      LOG.error("Exception while waiting for " + asyncNotifier.getName() +
+          " threads to die", e);
+    }
+
+    for (int i = 0; i < asyncSyncers.length; ++i) {
       try {
-        logSyncer.close();
-        // Make sure we synced everything
-        logSyncer.join(this.optionalFlushInterval * 2);
+        asyncSyncers[i].interrupt();
+        asyncSyncers[i].join();
       } catch (InterruptedException e) {
-        LOG.error("Exception while waiting for syncer thread to die", e);
-        Thread.currentThread().interrupt();
+        LOG.error("Exception while waiting for " + asyncSyncers[i].getName() +
+            " threads to die", e);
       }
     }
+
+    try {
+      asyncWriter.interrupt();
+      asyncWriter.join();
+    } catch (InterruptedException e) {
+      LOG.error("Exception while waiting for " + asyncWriter.getName() +
+          " thread to die", e);
+    }
+
     try {
       // Prevent all further flushing and rolling.
       closeBarrier.stopAndDrainOps();
@@ -985,9 +1017,14 @@ class FSHLog implements HLog, Syncable {
           if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
           HLogKey logKey = makeKey(
             encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
-          doWrite(info, logKey, edits, htd);
+
+          synchronized (pendingWritesLock) {
+            doWrite(info, logKey, edits, htd);
+            txid = this.unflushedEntries.incrementAndGet();
+          }
           this.numEntries.incrementAndGet();
-          txid = this.unflushedEntries.incrementAndGet();
+          this.asyncWriter.setPendingTxid(txid);
+
           if (htd.isDeferredLogFlush()) {
             lastDeferredTxid = txid;
           }
@@ -1017,91 +1054,245 @@ class FSHLog implements HLog, Syncable {
         now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce);
   }
 
-  /**
-   * This class is responsible to hold the HLog's appended Entry list
-   * and to sync them according to a configurable interval.
-   *
-   * Deferred log flushing works first by piggy backing on this process by
-   * simply not sync'ing the appended Entry. It can also be sync'd by other
-   * non-deferred log flushed entries outside of this thread.
-   */
-  class LogSyncer extends HasThread {
-
-    private final long optionalFlushInterval;
-
-    private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
-
-    // List of pending writes to the HLog. There corresponds to transactions
-    // that have not yet returned to the client. We keep them cached here
-    // instead of writing them to HDFS piecemeal, because the HDFS write
-    // method is pretty heavyweight as far as locking is concerned. The
-    // goal is to increase the batchsize for writing-to-hdfs as well as
-    // sync-to-hdfs, so that we can get better system throughput.
-    private List<Entry> pendingWrites = new LinkedList<Entry>();
+  /* The work of current write process of HLog goes as below:
+   * 1). All write handler threads append edits to HLog's local pending buffer;
+   *     (it notifies AsyncWriter thread that there is new edits in local buffer)
+   * 2). All write handler threads wait in HLog.syncer() function for underlying threads
to
+   *     finish the sync that contains its txid;
+   * 3). An AsyncWriter thread is responsible for retrieving all edits in HLog's
+   *     local pending buffer and writing to the hdfs (hlog.writer.append);
+   *     (it notifies AsyncSyncer threads that there is new writes to hdfs which needs a
sync)
+   * 4). AsyncSyncer threads are responsible for issuing sync request to hdfs to persist
the
+   *     writes by AsyncWriter; (they notify the AsyncNotifier thread that sync is done)
+   * 5). An AsyncNotifier thread is responsible for notifying all pending write handler
+   *     threads which are waiting in the HLog.syncer() function
+   * 6). No LogSyncer thread any more (since there is always AsyncWriter/AsyncFlusher threads
+   *     do the same job it does)
+   * note: more than one AsyncSyncer threads are needed here to guarantee good enough performance
+   *       when less concurrent write handler threads. since sync is the most time-consuming
+   *       operation in the whole write process, multiple AsyncSyncer threads can provide
better
+   *       parallelism of sync to get better overall throughput
+   */
+  // thread to write locally buffered writes to HDFS
+  private class AsyncWriter extends HasThread {
+    private long pendingTxid = 0;
+    private long txidToWrite = 0;
+    private long lastWrittenTxid = 0;
+    private Object writeLock = new Object();
+
+    public AsyncWriter(String name) {
+      super(name);
+    }
+
+    // wake up (called by (write) handler thread) AsyncWriter thread
+    // to write buffered writes to HDFS
+    public void setPendingTxid(long txid) {
+      synchronized (this.writeLock) {
+        if (txid <= this.pendingTxid)
+          return;
 
-    LogSyncer(long optionalFlushInterval) {
-      this.optionalFlushInterval = optionalFlushInterval;
+        this.pendingTxid = txid;
+        this.writeLock.notify();
+      }
     }
 
-    @Override
     public void run() {
       try {
-        // awaiting with a timeout doesn't always
-        // throw exceptions on interrupt
-        while(!this.isInterrupted() && !closeLogSyncer.get()) {
+        while (!this.isInterrupted()) {
+          // 1. wait until there is new writes in local buffer
+          synchronized (this.writeLock) {
+            while (this.pendingTxid <= this.lastWrittenTxid) {
+              this.writeLock.wait();
+            }
+          }
 
+          // 2. get all buffered writes and update 'real' pendingTxid
+          //    since maybe newer writes enter buffer as AsyncWriter wakes
+          //    up and holds the lock
+          // NOTE! can't hold 'updateLock' here since rollWriter will pend
+          // on 'sync()' with 'updateLock', but 'sync()' will wait for
+          // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock
+          // can leads to pendWrites more than pendingTxid, but not problem
+          List<Entry> pendWrites = null;
+          synchronized (pendingWritesLock) {
+            this.txidToWrite = unflushedEntries.get();
+            pendWrites = pendingWrites;
+            pendingWrites = new LinkedList<Entry>();
+          }
+
+          // 3. write all buffered writes to HDFS(append, without sync)
           try {
-            if (unflushedEntries.get() <= syncedTillHere) {
-              synchronized (closeLogSyncer) {
-                closeLogSyncer.wait(this.optionalFlushInterval);
-              }
+            for (Entry e : pendWrites) {
+              writer.append(e);
             }
-            // Calling sync since we waited or had unflushed entries.
-            // Entries appended but not sync'd are taken care of here AKA
-            // deferred log flush
-            sync();
-          } catch (IOException e) {
-            LOG.error("Error while syncing, requesting close of hlog ", e);
+          } catch(IOException e) {
+            LOG.error("Error while AsyncWriter write, request close of hlog ", e);
             requestLogRoll();
-            Threads.sleep(this.optionalFlushInterval);
+
+            asyncIOE = e;
+            failedTxid.set(this.txidToWrite);
+          }
+
+          // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync'
+          this.lastWrittenTxid = this.txidToWrite;
+          boolean hasIdleSyncer = false;
+          for (int i = 0; i < asyncSyncers.length; ++i) {
+            if (!asyncSyncers[i].isSyncing()) {
+              hasIdleSyncer = true;
+              asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
+              break;
+            }
+          }
+          if (!hasIdleSyncer) {
+            int idx = (int)this.lastWrittenTxid % asyncSyncers.length;
+            asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
           }
         }
       } catch (InterruptedException e) {
-        LOG.debug(getName() + " interrupted while waiting for sync requests");
+        LOG.debug(getName() + " interrupted while waiting for " +
+            "newer writes added to local buffer");
+      } catch (Exception e) {
+        LOG.error("UNEXPECTED", e);
       } finally {
         LOG.info(getName() + " exiting");
       }
     }
+  }
 
-    // appends new writes to the pendingWrites. It is better to keep it in
-    // our own queue rather than writing it to the HDFS output stream because
-    // HDFSOutputStream.writeChunk is not lightweight at all.
-    synchronized void append(Entry e) throws IOException {
-      pendingWrites.add(e);
+  // thread to request HDFS to sync the WALEdits written by AsyncWriter
+  // to make those WALEdits durable on HDFS side
+  private class AsyncSyncer extends HasThread {
+    private long writtenTxid = 0;
+    private long txidToSync = 0;
+    private long lastSyncedTxid = 0;
+    private volatile boolean isSyncing = false;
+    private Object syncLock = new Object();
+
+    public AsyncSyncer(String name) {
+      super(name);
+    }
+
+    public boolean isSyncing() {
+      return this.isSyncing;
+    }
+
+    // wake up (called by AsyncWriter thread) AsyncSyncer thread
+    // to sync(flush) writes written by AsyncWriter in HDFS
+    public void setWrittenTxid(long txid) {
+      synchronized (this.syncLock) {
+        if (txid <= this.writtenTxid)
+          return;
+
+        this.writtenTxid = txid;
+        this.syncLock.notify();
+      }
     }
 
-    // Returns all currently pending writes. New writes
-    // will accumulate in a new list.
-    synchronized List<Entry> getPendingWrites() {
-      List<Entry> save = this.pendingWrites;
-      this.pendingWrites = new LinkedList<Entry>();
-      return save;
+    public void run() {
+      try {
+        while (!this.isInterrupted()) {
+          // 1. wait until AsyncWriter has written data to HDFS and
+          //    called setWrittenTxid to wake up us
+          synchronized (this.syncLock) {
+            while (this.writtenTxid <= this.lastSyncedTxid) {
+              this.syncLock.wait();
+            }
+            this.txidToSync = this.writtenTxid;
+          }
+
+          // 2. do 'sync' to HDFS to provide durability
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          try {
+            this.isSyncing = true;
+            if (writer != null) writer.sync();
+            this.isSyncing = false;
+          } catch (IOException e) {
+            LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
+            requestLogRoll();
+
+            asyncIOE = e;
+            failedTxid.set(this.txidToSync);
+          }
+          metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
+
+          // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put'
+          // handler threads on 'sync()'
+          this.lastSyncedTxid = this.txidToSync;
+          asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
+
+          // 4. check and do logRoll if needed
+          if (!logRollRunning) {
+            checkLowReplication();
+            try {
+              if (writer != null && writer.getLength() > logrollsize) {
+                requestLogRoll();
+              }
+            } catch (IOException e) {
+              LOG.warn("writer.getLength() failed,this failure won't block here");
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.debug(getName() + " interrupted while waiting for " +
+            "notification from AsyncWriter thread");
+      } catch (Exception e) {
+        LOG.error("UNEXPECTED", e);
+      } finally {
+        LOG.info(getName() + " exiting");
+      }
     }
+  }
 
-    // writes out pending entries to the HLog
-    void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
-      if (pending == null) return;
+  // thread to notify all write handler threads which are pending on
+  // their written WALEdits' durability(sync)
+  // why an extra 'notifier' thread is needed rather than letting
+  // AsyncSyncer thread itself notifies when sync is done is to let
+  // AsyncSyncer thread do next sync as soon as possible since 'notify'
+  // has heavy synchronization with all pending write handler threads
+  private class AsyncNotifier extends HasThread {
+    private long flushedTxid = 0;
+    private long lastNotifiedTxid = 0;
+    private Object notifyLock = new Object();
+
+    public AsyncNotifier(String name) {
+      super(name);
+    }
+
+    public void setFlushedTxid(long txid) {
+      synchronized (this.notifyLock) {
+        if (txid <= this.flushedTxid) {
+          return;
+        }
 
-      // write out all accumulated Entries to hdfs.
-      for (Entry e : pending) {
-        writer.append(e);
+        this.flushedTxid = txid;
+        this.notifyLock.notify();
       }
     }
 
-    void close() {
-      synchronized (closeLogSyncer) {
-        closeLogSyncer.set(true);
-        closeLogSyncer.notifyAll();
+    public void run() {
+      try {
+        while (!this.isInterrupted()) {
+          synchronized (this.notifyLock) {
+            while (this.flushedTxid <= this.lastNotifiedTxid) {
+              this.notifyLock.wait();
+            }
+            this.lastNotifiedTxid = this.flushedTxid;
+          }
+
+          // notify(wake-up) all pending (write) handler thread
+          // (or logroller thread which also may pend on sync())
+          synchronized (syncedTillHere) {
+            syncedTillHere.set(this.lastNotifiedTxid);
+            syncedTillHere.notifyAll();
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.debug(getName() + " interrupted while waiting for " +
+            " notification from AsyncSyncer thread");
+      } catch (Exception e) {
+        LOG.error("UNEXPECTED", e);
+      } finally {
+        LOG.info(getName() + " exiting");
       }
     }
   }
@@ -1113,95 +1304,20 @@ class FSHLog implements HLog, Syncable {
 
   // sync all transactions upto the specified txid
   private void syncer(long txid) throws IOException {
-    // if the transaction that we are interested in is already
-    // synced, then return immediately.
-    if (txid <= this.syncedTillHere) {
-      return;
-    }
-    Writer tempWriter;
-    synchronized (this.updateLock) {
-      if (this.closed) return;
-      // Guaranteed non-null.
-      // Note that parallel sync can close tempWriter.
-      // The current method of dealing with this is to catch exceptions.
-      // See HBASE-4387, HBASE-5623, HBASE-7329.
-      tempWriter = this.writer;
-    }
-    try {
-      long doneUpto;
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      // First flush all the pending writes to HDFS. Then
-      // issue the sync to HDFS. If sync is successful, then update
-      // syncedTillHere to indicate that transactions till this
-      // number has been successfully synced.
-      IOException ioe = null;
-      List<Entry> pending = null;
-      synchronized (flushLock) {
-        if (txid <= this.syncedTillHere) {
-          return;
-        }
-        doneUpto = this.unflushedEntries.get();
-        pending = logSyncer.getPendingWrites();
-        try {
-          logSyncer.hlogFlush(tempWriter, pending);
-          postAppend(pending);
-        } catch(IOException io) {
-          ioe = io;
-          LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
-        }
-      }
-      if (ioe != null && pending != null) {
-        synchronized (this.updateLock) {
-          synchronized (flushLock) {
-            // HBASE-4387, HBASE-5623, retry with updateLock held
-            tempWriter = this.writer;
-            logSyncer.hlogFlush(tempWriter, pending);
-            postAppend(pending);
-          }
-        }
-      }
-      // another thread might have sync'ed avoid double-sync'ing
-      if (txid <= this.syncedTillHere) {
-        return;
-      }
-      try {
-        if (tempWriter != null) {
-          tempWriter.sync();
-          postSync();
-        }
-      } catch(IOException ex) {
-        synchronized (this.updateLock) {
-          // HBASE-4387, HBASE-5623, retry with updateLock held
-          // TODO: we don't actually need to do it for concurrent close - what is the point
-          //       of syncing new unrelated writer? Keep behavior for now.
-          tempWriter = this.writer;
-          if (tempWriter != null) {
-            tempWriter.sync();
-            postSync();
-          }
-        }
-      }
-      this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
-
-      this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
-      // TODO: preserving the old behavior for now, but this check is strange. It's not
-      //       protected by any locks here, so for all we know rolling locks might start
-      //       as soon as we enter the "if". Is this best-effort optimization check?
-      if (!this.logRollRunning) {
-        checkLowReplication();
+    synchronized (this.syncedTillHere) {
+      while (this.syncedTillHere.get() < txid) {
         try {
-          curLogSize = tempWriter.getLength();
-          if (curLogSize > this.logrollsize) {
-            requestLogRoll();
+          this.syncedTillHere.wait();
+
+          if (txid <= this.failedTxid.get()) {
+            assert asyncIOE != null :
+              "current txid is among(under) failed txids, but asyncIOE is null!";
+            throw asyncIOE;
           }
-        } catch (IOException x) {
-          LOG.debug("Log roll failed and will be retried. (This is not an error)");
+        } catch (InterruptedException e) {
+          LOG.debug("interrupted while waiting for notification from AsyncNotifier");
         }
       }
-    } catch (IOException e) {
-      LOG.fatal("Could not sync. Requesting roll of hlog", e);
-      requestLogRoll();
-      throw e;
     }
   }
 
@@ -1333,7 +1449,7 @@ class FSHLog implements HLog, Syncable {
           logKey.setScopes(null);
         }
         // write to our buffer for the Hlog file.
-        logSyncer.append(new FSHLog.Entry(logKey, logEdit));
+        this.pendingWrites.add(new HLog.Entry(logKey, logEdit));
       }
       long took = EnvironmentEdgeManager.currentTimeMillis() - now;
       coprocessorHost.postWALWrite(info, logKey, logEdit);
@@ -1489,7 +1605,7 @@ class FSHLog implements HLog, Syncable {
 
   /** Provide access to currently deferred sequence num for tests */
   boolean hasDeferredEntries() {
-    return lastDeferredTxid > syncedTillHere;
+    return this.lastDeferredTxid > this.syncedTillHere.get();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java?rev=1550778&r1=1550777&r2=1550778&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
Fri Dec 13 17:32:09 2013
@@ -82,25 +82,27 @@ public class TestDurability {
     HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true);
 
     region.put(newPut(null));
-
     verifyHLogCount(wal, 1);
 
-    // a put through the deferred table does not write to the wal immdiately
+    // a put through the deferred table does not write to the wal immdiately,
+    // but maybe has been successfully sync-ed by the underlying AsyncWriter +
+    // AsyncFlusher thread
     deferredRegion.put(newPut(null));
-    verifyHLogCount(wal, 1);
     // but will after we sync the wal
     wal.sync();
     verifyHLogCount(wal, 2);
 
     // a put through a deferred table will be sync with the put sync'ed put
     deferredRegion.put(newPut(null));
-    verifyHLogCount(wal, 2);
+    wal.sync();
+    verifyHLogCount(wal, 3);
     region.put(newPut(null));
     verifyHLogCount(wal, 4);
 
     // a put through a deferred table will be sync with the put sync'ed put
     deferredRegion.put(newPut(Durability.USE_DEFAULT));
-    verifyHLogCount(wal, 4);
+    wal.sync();
+    verifyHLogCount(wal, 5);
     region.put(newPut(Durability.USE_DEFAULT));
     verifyHLogCount(wal, 6);
 
@@ -114,7 +116,6 @@ public class TestDurability {
     // async overrides sync table default
     region.put(newPut(Durability.ASYNC_WAL));
     deferredRegion.put(newPut(Durability.ASYNC_WAL));
-    verifyHLogCount(wal, 6);
     wal.sync();
     verifyHLogCount(wal, 8);
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java?rev=1550778&r1=1550777&r2=1550778&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
Fri Dec 13 17:32:09 2013
@@ -151,10 +151,8 @@ public class TestLogRollAbort {
     dfsCluster.restartDataNodes();
     LOG.info("Restarted datanodes");
 
-    assertTrue("Should have an outstanding WAL edit", ((FSHLog) log).hasDeferredEntries());
     try {
       log.rollWriter(true);
-      fail("Log roll should have triggered FailedLogCloseException");
     } catch (FailedLogCloseException flce) {
       assertTrue("Should have deferred flush log edits outstanding",
           ((FSHLog) log).hasDeferredEntries());



Mime
View raw message