Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 40D8C908C for ; Thu, 17 May 2012 15:56:26 +0000 (UTC) Received: (qmail 66641 invoked by uid 500); 17 May 2012 15:56:26 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 66606 invoked by uid 500); 17 May 2012 15:56:26 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66599 invoked by uid 99); 17 May 2012 15:56:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2012 15:56:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2012 15:56:21 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D2A45238896F for ; Thu, 17 May 2012 15:55:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1339671 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Date: Thu, 17 May 2012 15:55:59 -0000 To: commits@hbase.apache.org From: tedyu@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120517155559.D2A45238896F@eris.apache.org> Author: tedyu Date: Thu May 17 15:55:59 2012 New Revision: 1339671 URL: http://svn.apache.org/viewvc?rev=1339671&view=rev Log: HBASE-5826 Improve sync of HLog edits (Todd) Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1339671&r1=1339670&r2=1339671&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu May 17 15:55:59 2012 @@ -48,6 +48,7 @@ import java.util.concurrent.locks.Reentr import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -139,8 +140,10 @@ public class HLog implements Syncable { private final long optionalFlushInterval; private final long blocksize; private final String prefix; - private final AtomicLong unflushedEntries = new AtomicLong(0); - private volatile long syncedTillHere = 0; + + /** tracking information about what has been synced */ + private SyncInfo syncInfo = new SyncInfo(); + private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -230,7 +233,6 @@ public class HLog implements Syncable { // during an update // locked during appends private final Object updateLock = new Object(); - private final Object flushLock = new Object(); private final boolean enabled; @@ -298,6 +300,7 @@ public class HLog implements Syncable { private static Metric writeSize = new Metric(); // For measuring latency of syncs private static Metric syncTime = new Metric(); + private static AtomicLong syncBatchSize = new AtomicLong(); //For measuring slow HLog appends private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static Metric slowHLogAppendTime = new Metric(); @@ -314,6 +317,10 @@ public class HLog implements Syncable { return syncTime.get(); } + public static long getSyncBatchSize() { + return syncBatchSize.getAndSet(0); + } + public static long getSlowAppendCount() { return slowHLogAppendCount.get(); } @@ -833,17 +840,11 @@ public class HLog implements Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { - LOG.debug("cleanupCurrentWriter " + - " waiting for transactions to get synced " + - " total " + this.unflushedEntries.get() + - " synced till here " + syncedTillHere); - sync(); - } + sync(); this.writer.close(); this.writer = null; closeErrorCount.set(0); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed close of HLog writer", e); int errors = closeErrorCount.incrementAndGet(); if (errors <= closeErrorsTolerated && !hasDeferredEntries()) { @@ -1006,7 +1007,7 @@ public class HLog implements Syncable { * @param logEdit * @param logKey * @param doSync shall we sync after writing the transaction - * @return The txid of this transaction + * @return The seqnum of this transaction * @throws IOException */ public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, @@ -1015,9 +1016,9 @@ public class HLog implements Syncable { if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; + long seqNum; synchronized (updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); logKey.setLogSeqNum(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular @@ -1027,10 +1028,9 @@ public class HLog implements Syncable { this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit, htd); - txid = this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredTxid = seqNum; } } @@ -1040,9 +1040,9 @@ public class HLog implements Syncable { (regionInfo.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return seqNum; } /** @@ -1090,13 +1090,13 @@ public class HLog implements Syncable { private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd, boolean doSync) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get();; + if (edits.isEmpty()) return this.logSeqNum.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; + long seqNum; synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -1109,9 +1109,8 @@ public class HLog implements Syncable { HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredTxid = seqNum; } } // Sync if catalog region, and if not then check if that table supports @@ -1120,9 +1119,9 @@ public class HLog implements Syncable { (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return seqNum; } /** @@ -1180,6 +1179,9 @@ public class HLog implements Syncable { // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); + long lastSeqAppended = -1; + long lastSeqFlushed = -1; + private Object flushLock = new Object(); LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; @@ -1193,7 +1195,7 @@ public class HLog implements Syncable { while(!this.isInterrupted() && !closeLogSyncer) { try { - if (unflushedEntries.get() <= syncedTillHere) { + if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) { Thread.sleep(this.optionalFlushInterval); } sync(); @@ -1213,24 +1215,45 @@ public class HLog implements Syncable { // our own queue rather than writing it to the HDFS output stream because // HDFSOutputStream.writeChunk is not lightweight at all. synchronized void append(Entry e) throws IOException { + long seq = e.getKey().getLogSeqNum(); + assert seq > lastSeqAppended; + lastSeqAppended = seq; pendingWrites.add(e); } // Returns all currently pending writes. New writes // will accumulate in a new list. - synchronized List getPendingWrites() { - List save = this.pendingWrites; - this.pendingWrites = new LinkedList(); - return save; - } + long flushWritesTo(Writer writer) throws IOException { + synchronized (flushLock) { + List pending; + + synchronized (this) { + pending = pendingWrites; + pendingWrites = new LinkedList(); + } - // writes out pending entries to the HLog - void hlogFlush(Writer writer, List pending) throws IOException { - if (pending == null) return; - - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + boolean success = false; + try { + int numFlushed = 0; + for (Entry e : pending) { + writer.append(e); + long seq = e.getKey().getLogSeqNum(); + assert seq > lastSeqFlushed; + lastSeqFlushed = seq; + numFlushed++; + } + syncBatchSize.addAndGet(numFlushed); + success = true; + } finally { + if (!success) { + // push back our batch into the pending list + synchronized (this) { + pending.addAll(pendingWrites); + pendingWrites = pending; + } + } + } + return lastSeqFlushed; } } @@ -1239,9 +1262,31 @@ public class HLog implements Syncable { } } + private static class SyncInfo { + private long syncedTillHere = 0; + + synchronized long getLastSyncedTxId() { + return syncedTillHere; + } + + synchronized void notifySynced(long txid) { + if (txid > syncedTillHere) { + syncedTillHere = txid; + } + notifyAll(); + } + + synchronized void waitForSync(long txid) throws InterruptedException { + while (syncedTillHere < txid) { + wait(); + } + } + } + + // sync all known transactions private void syncer() throws IOException { - syncer(this.unflushedEntries.get()); // sync all pending items + syncer(logSeqNum.get()); // sync all pending } // sync all transactions upto the specified txid @@ -1253,47 +1298,26 @@ public class HLog implements Syncable { } // if the transaction that we are interested in is already // synced, then return immediately. - if (txid <= this.syncedTillHere) { + if (syncInfo.getLastSyncedTxId() >= txid) { return; } try { - long doneUpto; long now = System.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - synchronized (flushLock) { - if (txid <= this.syncedTillHere) { - return; - } - doneUpto = this.unflushedEntries.get(); - List pending = logSyncerThread.getPendingWrites(); - try { - logSyncerThread.hlogFlush(tempWriter, pending); - } catch(IOException io) { - synchronized (this.updateLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - tempWriter = this.writer; - logSyncerThread.hlogFlush(tempWriter, pending); - } - } - } - // another thread might have sync'ed avoid double-sync'ing - if (txid <= this.syncedTillHere) { - return; - } + long flushedSeqId; try { + flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; + flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); } } - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); - + syncInfo.notifySynced(flushedSeqId); + // We try to not acquire the updateLock just to update statistics. + // Make these statistics as AtomicLong. syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); @@ -1548,14 +1572,15 @@ public class HLog implements Syncable { if (this.closed) { return; } - long txid = 0; + long seqNumOfCompletionEdit; synchronized (updateLock) { + seqNumOfCompletionEdit = obtainSeqNum(); long now = System.currentTimeMillis(); - WALEdit edit = completeCacheFlushLogEdit(); - HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, + + WALEdit edit = completeCacheFlushLogEdit(logSeqId); + HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); - txid = this.unflushedEntries.incrementAndGet(); writeTime.inc(System.currentTimeMillis() - now); long len = 0; for (KeyValue kv : edit.getKeyValues()) { @@ -1565,7 +1590,7 @@ public class HLog implements Syncable { this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(txid); + this.sync(seqNumOfCompletionEdit); } finally { // updateLock not needed for removing snapshot's entry @@ -1576,9 +1601,17 @@ public class HLog implements Syncable { } } - private WALEdit completeCacheFlushLogEdit() { + private WALEdit completeCacheFlushLogEdit(long seqIdOfFlush) { + // The data is not actually used here - we just need to write + // something to the log to make sure we're still the owner of the + // pipeline. + byte[] data = Bytes.add( + COMPLETE_CACHE_FLUSH, + ":".getBytes(Charsets.UTF_8), + Bytes.toBytes(String.valueOf(seqIdOfFlush))); + KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, - System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); + System.currentTimeMillis(), data); WALEdit e = new WALEdit(); e.add(kv); return e; @@ -1855,7 +1888,7 @@ public class HLog implements Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncedTillHere; + return lastDeferredTxid > syncInfo.getLastSyncedTxId(); } /** Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1339671&r1=1339670&r2=1339671&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Thu May 17 15:55:59 2012 @@ -245,7 +245,7 @@ public class SequenceFileLogWriter imple @Override public void sync() throws IOException { - this.writer.syncFs(); + if (this.writer != null) this.writer.syncFs(); } @Override Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1339671&r1=1339670&r2=1339671&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Thu May 17 15:55:59 2012 @@ -548,8 +548,9 @@ public class TestHLog { KeyValue kv = val.getKeyValues().get(0); assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); - assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, - val.getKeyValues().get(0).getValue())); + assertTrue(Bytes.startsWith( + val.getKeyValues().get(0).getValue(), + HLog.COMPLETE_CACHE_FLUSH)); System.out.println(key + " " + val); } } finally { @@ -616,8 +617,9 @@ public class TestHLog { assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); - assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, - val.getValue())); + assertTrue(Bytes.startsWith( + val.getValue(), + HLog.COMPLETE_CACHE_FLUSH)); System.out.println(entry.getKey() + " " + val); } } finally {